Commit 7ee44703 authored by jonas@eel.(none)'s avatar jonas@eel.(none)

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  eel.(none):/home/jonas/src/mysql-5.1-new
parents d967e620 635ce25b
...@@ -30,7 +30,7 @@ private: ...@@ -30,7 +30,7 @@ private:
BUSY_LOOP = 1, BUSY_LOOP = 1,
CLEANUP_LOOP = 2, CLEANUP_LOOP = 2,
LCP_LOOP = 3, LCP_LOOP = 3,
LCP_PREPARE = 4 LCP_LOCKED = 4
}; };
}; };
......
...@@ -11203,12 +11203,6 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal) ...@@ -11203,12 +11203,6 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal, sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB); LcpPrepareReq::SignalLength, JBB);
if (lcpPtr.p->firstFragmentFlag)
{
lcpPtr.p->m_outstanding++;
sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
}
}//Dblqh::sendLCP_FRAGIDREQ() }//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle) void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
......
...@@ -6,9 +6,11 @@ ndbd_redo_log_reader_SOURCES = redoLogReader/records.cpp \ ...@@ -6,9 +6,11 @@ ndbd_redo_log_reader_SOURCES = redoLogReader/records.cpp \
include $(top_srcdir)/storage/ndb/config/common.mk.am include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
LDADD += \ ndbd_redo_log_reader_LDFLAGS = @ndb_bin_am_ldflags@ \
$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \ $(top_builddir)/storage/ndb/src/libndbclient.la \
$(top_builddir)/storage/ndb/src/common/portlib/libportlib.la $(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
# Don't update the files from bitkeeper # Don't update the files from bitkeeper
%::SCCS/s.% %::SCCS/s.%
...@@ -604,6 +604,7 @@ struct Fragrecord { ...@@ -604,6 +604,7 @@ struct Fragrecord {
SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map
Uint32 m_lcp_scan_op; Uint32 m_lcp_scan_op;
Uint32 m_lcp_keep_list;
State fragStatus; State fragStatus;
Uint32 fragTableId; Uint32 fragTableId;
...@@ -1194,8 +1195,10 @@ typedef Ptr<HostBuffer> HostBufferPtr; ...@@ -1194,8 +1195,10 @@ typedef Ptr<HostBuffer> HostBufferPtr;
STATIC_CONST( ALLOC = 0x00100000 ); // Is record allocated now STATIC_CONST( ALLOC = 0x00100000 ); // Is record allocated now
STATIC_CONST( MM_SHRINK = 0x00200000 ); // Has MM part shrunk STATIC_CONST( MM_SHRINK = 0x00200000 ); // Has MM part shrunk
STATIC_CONST( MM_GROWN = 0x00400000 ); // Has MM part grown STATIC_CONST( MM_GROWN = 0x00400000 ); // Has MM part grown
STATIC_CONST( FREE = 0x00800000 ); // On free list of page STATIC_CONST( FREED = 0x00800000 ); // Is freed
STATIC_CONST( LCP_SKIP = 0x01000000 ); // Should not be returned in LCP STATIC_CONST( LCP_SKIP = 0x01000000 ); // Should not be returned in LCP
STATIC_CONST( LCP_KEEP = 0x02000000 ); // Should be returned in LCP
STATIC_CONST( FREE = 0x02800000 ); // Is free
Uint32 get_tuple_version() const { Uint32 get_tuple_version() const {
return m_header_bits & TUP_VERSION_MASK; return m_header_bits & TUP_VERSION_MASK;
......
...@@ -172,7 +172,7 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal) ...@@ -172,7 +172,7 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal)
*/ */
ndbout_c("clearing ALLOC"); ndbout_c("clearing ALLOC");
tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC; tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC;
tuple_ptr->m_header_bits |= Tuple_header::FREE; tuple_ptr->m_header_bits |= Tuple_header::FREED;
} }
} }
......
...@@ -52,6 +52,15 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal) ...@@ -52,6 +52,15 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
PagePtr pagePtr; PagePtr pagePtr;
Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p); Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
ndbassert(ptr->m_header_bits & Tuple_header::FREE);
if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
{
ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
ptr->m_header_bits |= Tuple_header::FREED;
return;
}
if (regTabPtr.p->m_attributes[MM].m_no_of_varsize) if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
{ {
ljam(); ljam();
...@@ -140,6 +149,15 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr) ...@@ -140,6 +149,15 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
regOperPtr->m_undo_buffer_space= 0; regOperPtr->m_undo_buffer_space= 0;
} }
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}
void void
Dbtup::dealloc_tuple(Signal* signal, Dbtup::dealloc_tuple(Signal* signal,
Uint32 gci, Uint32 gci,
...@@ -149,8 +167,12 @@ Dbtup::dealloc_tuple(Signal* signal, ...@@ -149,8 +167,12 @@ Dbtup::dealloc_tuple(Signal* signal,
Fragrecord* regFragPtr, Fragrecord* regFragPtr,
Tablerec* regTabPtr) Tablerec* regTabPtr)
{ {
ptr->m_header_bits |= Tuple_header::FREE; Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
if (ptr->m_header_bits & Tuple_header::DISK_PART) Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;
Uint32 bits = ptr->m_header_bits;
Uint32 extra_bits = Tuple_header::FREED;
if (bits & Tuple_header::DISK_PART)
{ {
Local_key disk; Local_key disk;
memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk)); memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
...@@ -158,6 +180,23 @@ Dbtup::dealloc_tuple(Signal* signal, ...@@ -158,6 +180,23 @@ Dbtup::dealloc_tuple(Signal* signal,
&disk, *(PagePtr*)&m_pgman.m_ptr, gci); &disk, *(PagePtr*)&m_pgman.m_ptr, gci);
} }
if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
{
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
Local_key scanpos = scanOp.p->m_scanPos.m_key;
rowid.m_page_no = page->frag_page_id;
if (rowid >= scanpos)
{
extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
ptr->m_operation_ptr_i = lcp_keep_list;
regFragPtr->m_lcp_keep_list = rowid.ref();
}
}
ptr->m_header_bits = bits | extra_bits;
if (regTabPtr->m_bits & Tablerec::TR_RowGCI) if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
{ {
jam(); jam();
...@@ -165,15 +204,6 @@ Dbtup::dealloc_tuple(Signal* signal, ...@@ -165,15 +204,6 @@ Dbtup::dealloc_tuple(Signal* signal,
} }
} }
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}
void void
Dbtup::commit_operation(Signal* signal, Dbtup::commit_operation(Signal* signal,
Uint32 gci, Uint32 gci,
......
...@@ -287,8 +287,24 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr) ...@@ -287,8 +287,24 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr)
extentPtr.p->m_free_space += (real_free - estimated); extentPtr.p->m_free_space += (real_free - estimated);
update_extent_pos(alloc, extentPtr); update_extent_pos(alloc, extentPtr);
} }
}
#ifdef VM_TRACE
{
Local_key page;
page.m_file_no = pagePtr.p->m_file_no;
page.m_page_no = pagePtr.p->m_page_no;
Tablespace_client tsman(0, c_tsman,
0, 0, 0);
unsigned uncommitted, committed;
uncommitted = committed = ~(unsigned)0;
int ret = tsman.get_page_free_bits(&page, &uncommitted, &committed);
idx = alloc.calc_page_free_bits(real_free);
ddassert(idx == committed);
}
#endif
}
/** /**
* - Page free bits - * - Page free bits -
...@@ -743,10 +759,6 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal, ...@@ -743,10 +759,6 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
update_extent_pos(alloc, extentPtr); update_extent_pos(alloc, extentPtr);
} }
else
{
ndbout << endl;
}
{ {
Page_request_list list(c_page_request_pool, Page_request_list list(c_page_request_pool,
...@@ -771,15 +783,14 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr) ...@@ -771,15 +783,14 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
key.m_page_no = pagePtr.p->m_page_no; key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no; key.m_file_no = pagePtr.p->m_file_no;
pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
if (DBG_DISK) if (DBG_DISK)
ndbout << " disk_page_set_dirty " << key << endl; ndbout << " disk_page_set_dirty " << key << endl;
Uint32 tableId = pagePtr.p->m_table_id; Uint32 tableId = pagePtr.p->m_table_id;
Uint32 fragId = pagePtr.p->m_fragment_id; Uint32 fragId = pagePtr.p->m_fragment_id;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
Ptr<Tablerec> tabPtr; Ptr<Tablerec> tabPtr;
tabPtr.i= pagePtr.p->m_table_id; tabPtr.i= pagePtr.p->m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec); ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
...@@ -789,19 +800,28 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr) ...@@ -789,19 +800,28 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info; Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq)) if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{ {
jam();
restart_setup_page(alloc, pagePtr); restart_setup_page(alloc, pagePtr);
idx = alloc.calc_page_free_bits(free);
used = 0;
} }
else
{
idx &= ~0x8000;
ddassert(idx == alloc.calc_page_free_bits(free - used));
}
ddassert(free >= used);
Tablespace_client tsman(0, c_tsman, Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId, fragPtr.p->fragTableId,
fragPtr.p->fragmentId, fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id); fragPtr.p->m_tablespace_id);
ddassert(free >= used);
idx= alloc.calc_page_free_bits(free - used);
pagePtr.p->list_index = idx; pagePtr.p->list_index = idx;
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool; ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]); LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
...@@ -1340,9 +1360,15 @@ Dbtup::disk_restart_undo_callback(Signal* signal, ...@@ -1340,9 +1360,15 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += pagePtr.p->m_page_header.m_page_lsn_lo; lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
undo->m_page_ptr = pagePtr;
if (undo->m_lsn <= lsn) if (undo->m_lsn <= lsn)
{ {
undo->m_page_ptr = pagePtr; if (DBG_UNDO)
{
ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
update = true; update = true;
if (DBG_UNDO) if (DBG_UNDO)
...@@ -1368,13 +1394,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal, ...@@ -1368,13 +1394,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
ndbout << "disk_restart_undo: " << undo->m_type << " " ndbout << "disk_restart_undo: " << undo->m_type << " "
<< undo->m_key << endl; << undo->m_key << endl;
disk_restart_undo_page_bits(signal, undo);
lsn = undo->m_lsn - 1; // make sure undo isn't run again... lsn = undo->m_lsn - 1; // make sure undo isn't run again...
m_pgman.update_lsn(undo->m_key, lsn); m_pgman.update_lsn(undo->m_key, lsn);
} }
else if (DBG_UNDO)
{
ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
disk_restart_undo_page_bits(signal, undo);
disk_restart_undo_next(signal); disk_restart_undo_next(signal);
} }
......
...@@ -138,7 +138,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) ...@@ -138,7 +138,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regFragPtr.p->fragmentId= fragId; regFragPtr.p->fragmentId= fragId;
regFragPtr.p->m_tablespace_id= tablespace; regFragPtr.p->m_tablespace_id= tablespace;
regFragPtr.p->m_undo_complete= false; regFragPtr.p->m_undo_complete= false;
regFragPtr.p->m_lcp_scan_op= RNIL; regFragPtr.p->m_lcp_scan_op = RNIL;
regFragPtr.p->m_lcp_keep_list = RNIL;
Uint32 noAllocatedPages= allocFragPages(regFragPtr.p, pages); Uint32 noAllocatedPages= allocFragPages(regFragPtr.p, pages);
......
...@@ -587,8 +587,14 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -587,8 +587,14 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Uint32 foundGCI; Uint32 foundGCI;
bool mm = (bits & ScanOp::SCAN_DD); bool mm = (bits & ScanOp::SCAN_DD);
bool lcp = (bits & ScanOp::SCAN_LCP);
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size + Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0); (bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
if (lcp && lcp_list != RNIL)
goto found_lcp_keep;
while (true) { while (true) {
switch (pos.m_get) { switch (pos.m_get) {
case ScanPos::Get_next_page: case ScanPos::Get_next_page:
...@@ -864,6 +870,53 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -864,6 +870,53 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
signal->theData[1] = scanPtr.i; signal->theData[1] = scanPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
return false; return false;
found_lcp_keep:
Local_key tmp;
tmp.assref(lcp_list);
tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
Ptr<Page> pagePtr;
c_page_pool.getPtr(pagePtr, tmp.m_page_no);
Tuple_header* ptr = (Tuple_header*)
((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
Uint32 headerbits = ptr->m_header_bits;
ndbrequire(headerbits & Tuple_header::LCP_KEEP);
Uint32 next = ptr->m_operation_ptr_i;
ptr->m_operation_ptr_i = RNIL;
ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
jam();
setChecksum(ptr, tablePtr.p);
}
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL + 1;
conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list;
conf->localKey[1] = 0;
conf->localKeyLength = 1;
conf->gci = 0;
Uint32 blockNo = refToBlock(scan.m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
fragPtr.p->m_lcp_keep_list = next;
ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
if (headerbits & Tuple_header::FREED)
{
if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
{
jam();
free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
} else {
jam();
free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
}
}
return false;
} }
void void
......
...@@ -280,6 +280,16 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){ ...@@ -280,6 +280,16 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
waiter.p->m_size, waiter.p->m_size,
2*File_formats::UNDO_PAGE_WORDS); 2*File_formats::UNDO_PAGE_WORDS);
} }
if (!ptr.p->m_log_sync_waiters.isEmpty())
{
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
Ptr<Log_waiter> waiter;
list.first(waiter);
infoEvent(" m_last_synced_lsn: %lld: %d head(waiters).m_sync_lsn: %lld",
ptr.p->m_last_synced_lsn,
waiter.p->m_sync_lsn);
}
m_logfile_group_list.next(ptr); m_logfile_group_list.next(ptr);
} }
} }
......
...@@ -70,7 +70,6 @@ Pgman::Pgman(const Configuration & conf) : ...@@ -70,7 +70,6 @@ Pgman::Pgman(const Configuration & conf) :
addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true); addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF); addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
addRecSignal(GSN_LCP_PREPARE_REQ, &Pgman::execLCP_PREPARE_REQ);
addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD); addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ); addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
...@@ -224,20 +223,20 @@ Pgman::execCONTINUEB(Signal* signal) ...@@ -224,20 +223,20 @@ Pgman::execCONTINUEB(Signal* signal)
jam(); jam();
do_lcp_loop(signal); do_lcp_loop(signal);
break; break;
case PgmanContinueB::LCP_PREPARE: case PgmanContinueB::LCP_LOCKED:
{ {
jam(); jam();
Ptr<Page_entry> ptr; Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED]; Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.getPtr(ptr, data1); if (data1 != RNIL)
if (pl.next(ptr))
{ {
process_lcp_prepare(signal, ptr); pl.getPtr(ptr, data1);
process_lcp_locked(signal, ptr);
} }
else else
{ {
signal->theData[0] = 0; signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB); sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
} }
return; return;
} }
...@@ -1105,90 +1104,6 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr) ...@@ -1105,90 +1104,6 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
// LCP // LCP
void
Pgman::execLCP_PREPARE_REQ(Signal* signal)
{
jamEntry();
/**
* Reserve pages for all LOCKED pages...
*/
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_prepare(signal, ptr);
}
else
{
signal->theData[0] = 0;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
}
}
void
Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
{
ndbrequire(ptr.p->m_copy_page_i == RNIL);
Ptr<GlobalPage> copy;
ndbrequire(m_global_page_pool.seize(copy));
ptr.p->m_copy_page_i = copy.i;
DBG_LCP("assigning copy page to " << ptr << endl);
signal->theData[0] = PgmanContinueB::LCP_PREPARE;
signal->theData[1] = ptr.i;
sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
}
int
Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
{
DBG_LCP(<< ptr << " create_copy_page ");
if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state & Page_entry::COPY))
{
DBG_LCP(" return original" << endl);
return ptr.p->m_real_page_i;
}
if (! (ptr.p->m_state & Page_entry::COPY))
{
ptr.p->m_state |= Page_entry::COPY;
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
memcpy(copy.p, src.p, sizeof(GlobalPage));
DBG_LCP("making copy... ");
}
DBG_LCP("return " << ptr.p->m_copy_page_i);
return ptr.p->m_copy_page_i;
}
void
Pgman::restore_copy_page(Ptr<Page_entry> ptr)
{
DBG_LCP(ptr << " restore_copy_page");
Uint32 copyPtrI = ptr.p->m_copy_page_i;
if (ptr.p->m_state & Page_entry::COPY)
{
DBG_LCP(" copy back");
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, copyPtrI);
memcpy(src.p, copy.p, sizeof(GlobalPage));
}
m_global_page_pool.release(copyPtrI);
DBG_LCP(endl);
ptr.p->m_state &= ~Page_entry::COPY;
ptr.p->m_copy_page_i = RNIL;
}
void void
Pgman::execLCP_FRAG_ORD(Signal* signal) Pgman::execLCP_FRAG_ORD(Signal* signal)
...@@ -1196,10 +1111,7 @@ Pgman::execLCP_FRAG_ORD(Signal* signal) ...@@ -1196,10 +1111,7 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr(); LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0); ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0);
m_last_lcp = ord->lcpId; m_last_lcp = ord->lcpId;
DBG_LCP("execLCP_FRAG_ORD" << endl); DBG_LCP("Pgman::execLCP_FRAG_ORD lcp: " << m_last_lcp << endl);
ndbrequire(!m_lcp_outstanding);
m_lcp_curr_bucket = 0;
#ifdef VM_TRACE #ifdef VM_TRACE
debugOut debugOut
...@@ -1207,8 +1119,6 @@ Pgman::execLCP_FRAG_ORD(Signal* signal) ...@@ -1207,8 +1119,6 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
<< " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete << " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete
<< " bucket=" << m_lcp_curr_bucket << endl; << " bucket=" << m_lcp_curr_bucket << endl;
#endif #endif
do_lcp_loop(signal, true);
} }
void void
...@@ -1219,6 +1129,9 @@ Pgman::execEND_LCP_REQ(Signal* signal) ...@@ -1219,6 +1129,9 @@ Pgman::execEND_LCP_REQ(Signal* signal)
DBG_LCP("execEND_LCP_REQ" << endl); DBG_LCP("execEND_LCP_REQ" << endl);
ndbrequire(!m_lcp_outstanding);
m_lcp_curr_bucket = 0;
#ifdef VM_TRACE #ifdef VM_TRACE
debugOut debugOut
<< "PGMAN: execEND_LCP_REQ" << "PGMAN: execEND_LCP_REQ"
...@@ -1227,15 +1140,9 @@ Pgman::execEND_LCP_REQ(Signal* signal) ...@@ -1227,15 +1140,9 @@ Pgman::execEND_LCP_REQ(Signal* signal)
<< " outstanding=" << m_lcp_outstanding << endl; << " outstanding=" << m_lcp_outstanding << endl;
#endif #endif
if (m_last_lcp == m_last_lcp_complete)
{
ndbrequire(! m_lcp_loop_on);
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
DBG_LCP("GSN_END_LCP_CONF" << endl);
}
m_last_lcp_complete = m_last_lcp; m_last_lcp_complete = m_last_lcp;
do_lcp_loop(signal, true);
} }
bool bool
...@@ -1257,9 +1164,6 @@ Pgman::process_lcp(Signal* signal) ...@@ -1257,9 +1164,6 @@ Pgman::process_lcp(Signal* signal)
// start or re-start from beginning of current hash bucket // start or re-start from beginning of current hash bucket
if (m_lcp_curr_bucket != ~(Uint32)0) if (m_lcp_curr_bucket != ~(Uint32)0)
{ {
DBG_LCP(" PROCESS LCP m_lcp_curr_bucket"
<< m_lcp_curr_bucket << endl);
Page_hashlist::Iterator iter; Page_hashlist::Iterator iter;
pl_hash.next(m_lcp_curr_bucket, iter); pl_hash.next(m_lcp_curr_bucket, iter);
Uint32 loop = 0; Uint32 loop = 0;
...@@ -1270,15 +1174,11 @@ Pgman::process_lcp(Signal* signal) ...@@ -1270,15 +1174,11 @@ Pgman::process_lcp(Signal* signal)
Ptr<Page_entry>& ptr = iter.curr; Ptr<Page_entry>& ptr = iter.curr;
Page_state state = ptr.p->m_state; Page_state state = ptr.p->m_state;
DBG_LCP("LCP " DBG_LCP("LCP " << ptr << " - ");
<< " m_lcp_outstanding: " << m_lcp_outstanding
<< " max_count: " << max_count
<< " loop: " << loop
<< " iter.curr.i: " << iter.curr.i
<< " " << ptr);
if (ptr.p->m_last_lcp < m_last_lcp && if (ptr.p->m_last_lcp < m_last_lcp &&
(state & Page_entry::DIRTY)) (state & Page_entry::DIRTY) &&
(! (state & Page_entry::LOCKED)))
{ {
if(! (state & Page_entry::BOUND)) if(! (state & Page_entry::BOUND))
{ {
...@@ -1290,7 +1190,7 @@ Pgman::process_lcp(Signal* signal) ...@@ -1290,7 +1190,7 @@ Pgman::process_lcp(Signal* signal)
DBG_LCP(" BUSY" << endl); DBG_LCP(" BUSY" << endl);
break; // wait for it break; // wait for it
} }
if (state & Page_entry::PAGEOUT) else if (state & Page_entry::PAGEOUT)
{ {
DBG_LCP(" PAGEOUT -> state |= LCP" << endl); DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
set_page_state(ptr, state | Page_entry::LCP); set_page_state(ptr, state | Page_entry::LCP);
...@@ -1306,11 +1206,6 @@ Pgman::process_lcp(Signal* signal) ...@@ -1306,11 +1206,6 @@ Pgman::process_lcp(Signal* signal)
ptr.p->m_last_lcp = m_last_lcp; ptr.p->m_last_lcp = m_last_lcp;
m_lcp_outstanding++; m_lcp_outstanding++;
} }
else if (ptr.p->m_copy_page_i != RNIL)
{
DBG_LCP(" NOT DIRTY" << endl);
restore_copy_page(ptr);
}
else else
{ {
DBG_LCP(" NOT DIRTY" << endl); DBG_LCP(" NOT DIRTY" << endl);
...@@ -1323,22 +1218,68 @@ Pgman::process_lcp(Signal* signal) ...@@ -1323,22 +1218,68 @@ Pgman::process_lcp(Signal* signal)
if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding) if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding)
{ {
if (m_last_lcp == m_last_lcp_complete) Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_locked(signal, ptr);
}
else
{ {
signal->theData[0] = m_end_lcp_req.senderData; signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB); sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
DBG_LCP("GSN_END_LCP_CONF" << endl);
} }
DBG_LCP(" -- RETURN FALSE" << endl);
m_last_lcp_complete = m_last_lcp;
m_lcp_curr_bucket = ~(Uint32)0;
return false; return false;
} }
DBG_LCP(" -- RETURN TRUE" << endl);
return true; return true;
} }
void
Pgman::process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr)
{
ptr.p->m_last_lcp = m_last_lcp;
if (ptr.p->m_state & Page_entry::DIRTY)
{
Ptr<GlobalPage> org, copy;
ndbrequire(m_global_page_pool.seize(copy));
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(copy.p, org.p, sizeof(GlobalPage));
ptr.p->m_copy_page_i = copy.i;
m_lcp_outstanding++;
ptr.p->m_state |= Page_entry::LCP;
pageout(signal, ptr);
return;
}
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
void
Pgman::process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
{
Ptr<GlobalPage> org, copy;
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(org.p, copy.p, sizeof(GlobalPage));
m_global_page_pool.release(copy);
ptr.p->m_copy_page_i = RNIL;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
// page read and write // page read and write
void void
...@@ -1374,7 +1315,7 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr) ...@@ -1374,7 +1315,7 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0); ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--; m_stats.m_current_io_waits--;
ptr.p->m_last_lcp = m_last_lcp; ptr.p->m_last_lcp = m_last_lcp_complete;
do_busy_loop(signal, true); do_busy_loop(signal, true);
} }
...@@ -1460,18 +1401,16 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr) ...@@ -1460,18 +1401,16 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0); ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--; m_stats.m_current_io_waits--;
if (ptr.p->m_copy_page_i != RNIL)
{
jam();
restore_copy_page(ptr);
state &= ~ Page_entry::COPY;
}
if (state & Page_entry::LCP) if (state & Page_entry::LCP)
{ {
ndbrequire(m_lcp_outstanding); ndbrequire(m_lcp_outstanding);
m_lcp_outstanding--; m_lcp_outstanding--;
state &= ~ Page_entry::LCP; state &= ~ Page_entry::LCP;
if (ptr.p->m_copy_page_i != RNIL)
{
process_lcp_locked_fswriteconf(signal, ptr);
}
} }
set_page_state(ptr, state); set_page_state(ptr, state);
...@@ -1625,9 +1564,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req) ...@@ -1625,9 +1564,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
! (req_flags & Page_request::UNLOCK_PAGE)) ! (req_flags & Page_request::UNLOCK_PAGE))
{ {
ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0); ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
if (m_lcp_loop_on && ptr.p->m_copy_page_i != RNIL) if (ptr.p->m_copy_page_i != RNIL)
{ {
return create_copy_page(ptr, req_flags); return ptr.p->m_copy_page_i;
} }
return ptr.p->m_real_page_i; return ptr.p->m_real_page_i;
......
...@@ -305,7 +305,6 @@ private: ...@@ -305,7 +305,6 @@ private:
,PAGEIN = 0x0100 // paging in ,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out ,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout ,LOGSYNC = 0x0400 // undo WAL as part of pageout
,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed ,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot ,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack ,ONSTACK = 0x4000 // page is on LIRS stack
...@@ -419,7 +418,6 @@ protected: ...@@ -419,7 +418,6 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal); void execCONTINUEB(Signal* signal);
void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*); void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*); void execEND_LCP_REQ(Signal*);
...@@ -462,9 +460,8 @@ private: ...@@ -462,9 +460,8 @@ private:
void move_cleanup_ptr(Ptr<Page_entry> ptr); void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*); bool process_lcp(Signal*);
void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr); void process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr);
int create_copy_page(Ptr<Page_entry>, Uint32 req_flags); void process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr);
void restore_copy_page(Ptr<Page_entry>);
void pagein(Signal*, Ptr<Page_entry>); void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>); void fsreadreq(Signal*, Ptr<Page_entry>);
......
...@@ -1777,7 +1777,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1777,7 +1777,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
/** /**
* Handling of unmapped extent header pages is not implemented * Handling of unmapped extent header pages is not implemented
*/ */
int flags = 0; int flags = Page_cache_client::DIRTY_REQ;
int real_page_id; int real_page_id;
if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0) if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{ {
...@@ -1805,31 +1805,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal, ...@@ -1805,31 +1805,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo; lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn <= lsn)
{
/** /**
* Toggle word * Toggle word
*/ */
if (DBG_UNDO) if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x", {
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT))); ndbout << "tsman: apply " << undo_lsn << "(" << lsn << ") "
<< *key << " " << (src & COMMITTED_MASK)
<< " -> " << bits << endl;
}
lsn = undo_lsn;
page->m_page_header.m_page_lsn_hi = lsn >> 32;
page->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
ndbassert((bits & ~(COMMITTED_MASK)) == 0); ndbassert((bits & ~(COMMITTED_MASK)) == 0);
header->update_free_bits(page_no_in_extent, header->update_free_bits(page_no_in_extent,
bits | (bits << UNCOMMITTED_SHIFT)); bits | (bits << UNCOMMITTED_SHIFT));
m_page_cache_client.update_lsn(preq.m_page, lsn);
}
else
{
if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x",
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
}
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment