Commit 33cd363d authored by unknown's avatar unknown

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  eel.(none):/home/jonas/src/mysql-5.1-new


storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Auto merged
parents 7384eddb cd976d16
......@@ -30,7 +30,7 @@ private:
BUSY_LOOP = 1,
CLEANUP_LOOP = 2,
LCP_LOOP = 3,
LCP_PREPARE = 4
LCP_LOCKED = 4
};
};
......
......@@ -11203,12 +11203,6 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
if (lcpPtr.p->firstFragmentFlag)
{
lcpPtr.p->m_outstanding++;
sendSignal(PGMAN_REF, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
}
}//Dblqh::sendLCP_FRAGIDREQ()
void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
......
......@@ -6,9 +6,11 @@ ndbd_redo_log_reader_SOURCES = redoLogReader/records.cpp \
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_kernel.mk.am
LDADD += \
$(top_builddir)/storage/ndb/src/common/util/libgeneral.la \
$(top_builddir)/storage/ndb/src/common/portlib/libportlib.la
ndbd_redo_log_reader_LDFLAGS = @ndb_bin_am_ldflags@ \
$(top_builddir)/storage/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
# Don't update the files from bitkeeper
%::SCCS/s.%
......@@ -604,6 +604,7 @@ struct Fragrecord {
SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map
Uint32 m_lcp_scan_op;
Uint32 m_lcp_keep_list;
State fragStatus;
Uint32 fragTableId;
......@@ -1194,8 +1195,10 @@ typedef Ptr<HostBuffer> HostBufferPtr;
STATIC_CONST( ALLOC = 0x00100000 ); // Is record allocated now
STATIC_CONST( MM_SHRINK = 0x00200000 ); // Has MM part shrunk
STATIC_CONST( MM_GROWN = 0x00400000 ); // Has MM part grown
STATIC_CONST( FREE = 0x00800000 ); // On free list of page
STATIC_CONST( FREED = 0x00800000 ); // Is freed
STATIC_CONST( LCP_SKIP = 0x01000000 ); // Should not be returned in LCP
STATIC_CONST( LCP_KEEP = 0x02000000 ); // Should be returned in LCP
STATIC_CONST( FREE = 0x02800000 ); // Is free
Uint32 get_tuple_version() const {
return m_header_bits & TUP_VERSION_MASK;
......
......@@ -172,7 +172,7 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal)
*/
ndbout_c("clearing ALLOC");
tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC;
tuple_ptr->m_header_bits |= Tuple_header::FREE;
tuple_ptr->m_header_bits |= Tuple_header::FREED;
}
}
......
......@@ -51,6 +51,15 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
PagePtr pagePtr;
Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
ndbassert(ptr->m_header_bits & Tuple_header::FREE);
if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
{
ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
ptr->m_header_bits |= Tuple_header::FREED;
return;
}
if (regTabPtr.p->m_attributes[MM].m_no_of_varsize)
{
......@@ -140,6 +149,15 @@ void Dbtup::initOpConnection(Operationrec* regOperPtr)
regOperPtr->m_undo_buffer_space= 0;
}
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}
void
Dbtup::dealloc_tuple(Signal* signal,
Uint32 gci,
......@@ -149,15 +167,36 @@ Dbtup::dealloc_tuple(Signal* signal,
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
ptr->m_header_bits |= Tuple_header::FREE;
if (ptr->m_header_bits & Tuple_header::DISK_PART)
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;
Uint32 bits = ptr->m_header_bits;
Uint32 extra_bits = Tuple_header::FREED;
if (bits & Tuple_header::DISK_PART)
{
Local_key disk;
memcpy(&disk, ptr->get_disk_ref_ptr(regTabPtr), sizeof(disk));
disk_page_free(signal, regTabPtr, regFragPtr,
&disk, *(PagePtr*)&m_pgman.m_ptr, gci);
}
if (! (bits & Tuple_header::LCP_SKIP) && lcpScan_ptr_i != RNIL)
{
ScanOpPtr scanOp;
c_scanOpPool.getPtr(scanOp, lcpScan_ptr_i);
Local_key rowid = regOperPtr->m_tuple_location;
Local_key scanpos = scanOp.p->m_scanPos.m_key;
rowid.m_page_no = page->frag_page_id;
if (rowid >= scanpos)
{
extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
ptr->m_operation_ptr_i = lcp_keep_list;
regFragPtr->m_lcp_keep_list = rowid.ref();
}
}
ptr->m_header_bits = bits | extra_bits;
if (regTabPtr->m_bits & Tablerec::TR_RowGCI)
{
jam();
......@@ -165,15 +204,6 @@ Dbtup::dealloc_tuple(Signal* signal,
}
}
static
inline
bool
operator>=(const Local_key& key1, const Local_key& key2)
{
return key1.m_page_no > key2.m_page_no ||
(key1.m_page_no == key2.m_page_no && key1.m_page_idx >= key2.m_page_idx);
}
void
Dbtup::commit_operation(Signal* signal,
Uint32 gci,
......
......@@ -287,8 +287,24 @@ Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr)
extentPtr.p->m_free_space += (real_free - estimated);
update_extent_pos(alloc, extentPtr);
}
}
#ifdef VM_TRACE
{
Local_key page;
page.m_file_no = pagePtr.p->m_file_no;
page.m_page_no = pagePtr.p->m_page_no;
Tablespace_client tsman(0, c_tsman,
0, 0, 0);
unsigned uncommitted, committed;
uncommitted = committed = ~(unsigned)0;
int ret = tsman.get_page_free_bits(&page, &uncommitted, &committed);
idx = alloc.calc_page_free_bits(real_free);
ddassert(idx == committed);
}
#endif
}
/**
* - Page free bits -
......@@ -743,10 +759,6 @@ Dbtup::disk_page_prealloc_callback_common(Signal* signal,
update_extent_pos(alloc, extentPtr);
}
else
{
ndbout << endl;
}
{
Page_request_list list(c_page_request_pool,
......@@ -771,15 +783,14 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
pagePtr.p->nextList = pagePtr.p->prevList = RNIL;
if (DBG_DISK)
ndbout << " disk_page_set_dirty " << key << endl;
Uint32 tableId = pagePtr.p->m_table_id;
Uint32 fragId = pagePtr.p->m_fragment_id;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
Ptr<Tablerec> tabPtr;
tabPtr.i= pagePtr.p->m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
......@@ -789,19 +800,28 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
Uint32 free = pagePtr.p->free_space;
Uint32 used = pagePtr.p->uncommitted_used_space;
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
jam();
restart_setup_page(alloc, pagePtr);
idx = alloc.calc_page_free_bits(free);
used = 0;
}
else
{
idx &= ~0x8000;
ddassert(idx == alloc.calc_page_free_bits(free - used));
}
ddassert(free >= used);
Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
ddassert(free >= used);
idx= alloc.calc_page_free_bits(free - used);
pagePtr.p->list_index = idx;
ArrayPool<Page> *pool= (ArrayPool<Page>*)&m_global_page_pool;
LocalDLList<Page> list(*pool, alloc.m_dirty_pages[idx]);
......@@ -1339,10 +1359,16 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
Uint64 lsn = 0;
lsn += pagePtr.p->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += pagePtr.p->m_page_header.m_page_lsn_lo;
undo->m_page_ptr = pagePtr;
if (undo->m_lsn <= lsn)
{
undo->m_page_ptr = pagePtr;
if (DBG_UNDO)
{
ndbout << "apply: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
update = true;
if (DBG_UNDO)
......@@ -1368,13 +1394,17 @@ Dbtup::disk_restart_undo_callback(Signal* signal,
ndbout << "disk_restart_undo: " << undo->m_type << " "
<< undo->m_key << endl;
disk_restart_undo_page_bits(signal, undo);
lsn = undo->m_lsn - 1; // make sure undo isn't run again...
m_pgman.update_lsn(undo->m_key, lsn);
}
else if (DBG_UNDO)
{
ndbout << "ignore: " << undo->m_lsn << "(" << lsn << " )"
<< key << " type: " << undo->m_type << endl;
}
disk_restart_undo_page_bits(signal, undo);
disk_restart_undo_next(signal);
}
......
......@@ -138,8 +138,9 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
regFragPtr.p->fragmentId= fragId;
regFragPtr.p->m_tablespace_id= tablespace;
regFragPtr.p->m_undo_complete= false;
regFragPtr.p->m_lcp_scan_op= RNIL;
regFragPtr.p->m_lcp_scan_op = RNIL;
regFragPtr.p->m_lcp_keep_list = RNIL;
Uint32 noAllocatedPages= allocFragPages(regFragPtr.p, pages);
if (noAllocatedPages == 0) {
......
......@@ -587,8 +587,14 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Uint32 foundGCI;
bool mm = (bits & ScanOp::SCAN_DD);
bool lcp = (bits & ScanOp::SCAN_LCP);
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
if (lcp && lcp_list != RNIL)
goto found_lcp_keep;
while (true) {
switch (pos.m_get) {
case ScanPos::Get_next_page:
......@@ -864,6 +870,53 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
signal->theData[1] = scanPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
return false;
found_lcp_keep:
Local_key tmp;
tmp.assref(lcp_list);
tmp.m_page_no = getRealpid(fragPtr.p, tmp.m_page_no);
Ptr<Page> pagePtr;
c_page_pool.getPtr(pagePtr, tmp.m_page_no);
Tuple_header* ptr = (Tuple_header*)
((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
Uint32 headerbits = ptr->m_header_bits;
ndbrequire(headerbits & Tuple_header::LCP_KEEP);
Uint32 next = ptr->m_operation_ptr_i;
ptr->m_operation_ptr_i = RNIL;
ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
jam();
setChecksum(ptr, tablePtr.p);
}
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL + 1;
conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list;
conf->localKey[1] = 0;
conf->localKeyLength = 1;
conf->gci = 0;
Uint32 blockNo = refToBlock(scan.m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 7);
fragPtr.p->m_lcp_keep_list = next;
ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
if (headerbits & Tuple_header::FREED)
{
if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
{
jam();
free_var_rec(fragPtr.p, tablePtr.p, &tmp, pagePtr);
} else {
jam();
free_fix_rec(fragPtr.p, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
}
}
return false;
}
void
......
......@@ -280,6 +280,16 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
waiter.p->m_size,
2*File_formats::UNDO_PAGE_WORDS);
}
if (!ptr.p->m_log_sync_waiters.isEmpty())
{
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_sync_waiters);
Ptr<Log_waiter> waiter;
list.first(waiter);
infoEvent(" m_last_synced_lsn: %lld: %d head(waiters).m_sync_lsn: %lld",
ptr.p->m_last_synced_lsn,
waiter.p->m_sync_lsn);
}
m_logfile_group_list.next(ptr);
}
}
......
......@@ -70,7 +70,6 @@ Pgman::Pgman(const Configuration & conf) :
addRecSignal(GSN_FSWRITEREF, &Pgman::execFSWRITEREF, true);
addRecSignal(GSN_FSWRITECONF, &Pgman::execFSWRITECONF);
addRecSignal(GSN_LCP_PREPARE_REQ, &Pgman::execLCP_PREPARE_REQ);
addRecSignal(GSN_LCP_FRAG_ORD, &Pgman::execLCP_FRAG_ORD);
addRecSignal(GSN_END_LCP_REQ, &Pgman::execEND_LCP_REQ);
......@@ -224,20 +223,20 @@ Pgman::execCONTINUEB(Signal* signal)
jam();
do_lcp_loop(signal);
break;
case PgmanContinueB::LCP_PREPARE:
case PgmanContinueB::LCP_LOCKED:
{
jam();
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.getPtr(ptr, data1);
if (pl.next(ptr))
if (data1 != RNIL)
{
process_lcp_prepare(signal, ptr);
pl.getPtr(ptr, data1);
process_lcp_locked(signal, ptr);
}
else
{
signal->theData[0] = 0;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
}
return;
}
......@@ -1105,90 +1104,6 @@ Pgman::move_cleanup_ptr(Ptr<Page_entry> ptr)
// LCP
void
Pgman::execLCP_PREPARE_REQ(Signal* signal)
{
jamEntry();
/**
* Reserve pages for all LOCKED pages...
*/
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_prepare(signal, ptr);
}
else
{
signal->theData[0] = 0;
sendSignal(DBLQH_REF, GSN_LCP_PREPARE_CONF, signal, 1, JBB);
}
}
void
Pgman::process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr)
{
ndbrequire(ptr.p->m_copy_page_i == RNIL);
Ptr<GlobalPage> copy;
ndbrequire(m_global_page_pool.seize(copy));
ptr.p->m_copy_page_i = copy.i;
DBG_LCP("assigning copy page to " << ptr << endl);
signal->theData[0] = PgmanContinueB::LCP_PREPARE;
signal->theData[1] = ptr.i;
sendSignal(PGMAN_REF, GSN_CONTINUEB, signal, 2, JBB);
}
int
Pgman::create_copy_page(Ptr<Page_entry> ptr, Uint32 req_flags)
{
DBG_LCP(<< ptr << " create_copy_page ");
if (! (req_flags & DIRTY_FLAGS) && ! (ptr.p->m_state & Page_entry::COPY))
{
DBG_LCP(" return original" << endl);
return ptr.p->m_real_page_i;
}
if (! (ptr.p->m_state & Page_entry::COPY))
{
ptr.p->m_state |= Page_entry::COPY;
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
memcpy(copy.p, src.p, sizeof(GlobalPage));
DBG_LCP("making copy... ");
}
DBG_LCP("return " << ptr.p->m_copy_page_i);
return ptr.p->m_copy_page_i;
}
void
Pgman::restore_copy_page(Ptr<Page_entry> ptr)
{
DBG_LCP(ptr << " restore_copy_page");
Uint32 copyPtrI = ptr.p->m_copy_page_i;
if (ptr.p->m_state & Page_entry::COPY)
{
DBG_LCP(" copy back");
Ptr<GlobalPage> src;
Ptr<GlobalPage> copy;
m_global_page_pool.getPtr(src, ptr.p->m_real_page_i);
m_global_page_pool.getPtr(copy, copyPtrI);
memcpy(src.p, copy.p, sizeof(GlobalPage));
}
m_global_page_pool.release(copyPtrI);
DBG_LCP(endl);
ptr.p->m_state &= ~Page_entry::COPY;
ptr.p->m_copy_page_i = RNIL;
}
void
Pgman::execLCP_FRAG_ORD(Signal* signal)
......@@ -1196,10 +1111,7 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
LcpFragOrd* ord = (LcpFragOrd*)signal->getDataPtr();
ndbrequire(ord->lcpId >= m_last_lcp_complete + 1 || m_last_lcp_complete == 0);
m_last_lcp = ord->lcpId;
DBG_LCP("execLCP_FRAG_ORD" << endl);
ndbrequire(!m_lcp_outstanding);
m_lcp_curr_bucket = 0;
DBG_LCP("Pgman::execLCP_FRAG_ORD lcp: " << m_last_lcp << endl);
#ifdef VM_TRACE
debugOut
......@@ -1207,8 +1119,6 @@ Pgman::execLCP_FRAG_ORD(Signal* signal)
<< " this=" << m_last_lcp << " last_complete=" << m_last_lcp_complete
<< " bucket=" << m_lcp_curr_bucket << endl;
#endif
do_lcp_loop(signal, true);
}
void
......@@ -1219,6 +1129,9 @@ Pgman::execEND_LCP_REQ(Signal* signal)
DBG_LCP("execEND_LCP_REQ" << endl);
ndbrequire(!m_lcp_outstanding);
m_lcp_curr_bucket = 0;
#ifdef VM_TRACE
debugOut
<< "PGMAN: execEND_LCP_REQ"
......@@ -1227,15 +1140,9 @@ Pgman::execEND_LCP_REQ(Signal* signal)
<< " outstanding=" << m_lcp_outstanding << endl;
#endif
if (m_last_lcp == m_last_lcp_complete)
{
ndbrequire(! m_lcp_loop_on);
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
DBG_LCP("GSN_END_LCP_CONF" << endl);
}
m_last_lcp_complete = m_last_lcp;
do_lcp_loop(signal, true);
}
bool
......@@ -1257,9 +1164,6 @@ Pgman::process_lcp(Signal* signal)
// start or re-start from beginning of current hash bucket
if (m_lcp_curr_bucket != ~(Uint32)0)
{
DBG_LCP(" PROCESS LCP m_lcp_curr_bucket"
<< m_lcp_curr_bucket << endl);
Page_hashlist::Iterator iter;
pl_hash.next(m_lcp_curr_bucket, iter);
Uint32 loop = 0;
......@@ -1270,15 +1174,11 @@ Pgman::process_lcp(Signal* signal)
Ptr<Page_entry>& ptr = iter.curr;
Page_state state = ptr.p->m_state;
DBG_LCP("LCP "
<< " m_lcp_outstanding: " << m_lcp_outstanding
<< " max_count: " << max_count
<< " loop: " << loop
<< " iter.curr.i: " << iter.curr.i
<< " " << ptr);
DBG_LCP("LCP " << ptr << " - ");
if (ptr.p->m_last_lcp < m_last_lcp &&
(state & Page_entry::DIRTY))
(state & Page_entry::DIRTY) &&
(! (state & Page_entry::LOCKED)))
{
if(! (state & Page_entry::BOUND))
{
......@@ -1289,8 +1189,8 @@ Pgman::process_lcp(Signal* signal)
{
DBG_LCP(" BUSY" << endl);
break; // wait for it
}
if (state & Page_entry::PAGEOUT)
}
else if (state & Page_entry::PAGEOUT)
{
DBG_LCP(" PAGEOUT -> state |= LCP" << endl);
set_page_state(ptr, state | Page_entry::LCP);
......@@ -1306,11 +1206,6 @@ Pgman::process_lcp(Signal* signal)
ptr.p->m_last_lcp = m_last_lcp;
m_lcp_outstanding++;
}
else if (ptr.p->m_copy_page_i != RNIL)
{
DBG_LCP(" NOT DIRTY" << endl);
restore_copy_page(ptr);
}
else
{
DBG_LCP(" NOT DIRTY" << endl);
......@@ -1323,22 +1218,68 @@ Pgman::process_lcp(Signal* signal)
if (m_lcp_curr_bucket == ~(Uint32)0 && !m_lcp_outstanding)
{
if (m_last_lcp == m_last_lcp_complete)
Ptr<Page_entry> ptr;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
if (pl.first(ptr))
{
process_lcp_locked(signal, ptr);
}
else
{
signal->theData[0] = m_end_lcp_req.senderData;
sendSignal(m_end_lcp_req.senderRef, GSN_END_LCP_CONF, signal, 1, JBB);
DBG_LCP("GSN_END_LCP_CONF" << endl);
}
DBG_LCP(" -- RETURN FALSE" << endl);
m_last_lcp_complete = m_last_lcp;
m_lcp_curr_bucket = ~(Uint32)0;
return false;
}
DBG_LCP(" -- RETURN TRUE" << endl);
return true;
}
void
Pgman::process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr)
{
ptr.p->m_last_lcp = m_last_lcp;
if (ptr.p->m_state & Page_entry::DIRTY)
{
Ptr<GlobalPage> org, copy;
ndbrequire(m_global_page_pool.seize(copy));
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(copy.p, org.p, sizeof(GlobalPage));
ptr.p->m_copy_page_i = copy.i;
m_lcp_outstanding++;
ptr.p->m_state |= Page_entry::LCP;
pageout(signal, ptr);
return;
}
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
void
Pgman::process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
{
Ptr<GlobalPage> org, copy;
m_global_page_pool.getPtr(copy, ptr.p->m_copy_page_i);
m_global_page_pool.getPtr(org, ptr.p->m_real_page_i);
memcpy(org.p, copy.p, sizeof(GlobalPage));
m_global_page_pool.release(copy);
ptr.p->m_copy_page_i = RNIL;
Page_sublist& pl = *m_page_sublist[Page_entry::SL_LOCKED];
pl.next(ptr);
signal->theData[0] = PgmanContinueB::LCP_LOCKED;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
}
// page read and write
void
......@@ -1374,7 +1315,7 @@ Pgman::fsreadconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--;
ptr.p->m_last_lcp = m_last_lcp;
ptr.p->m_last_lcp = m_last_lcp_complete;
do_busy_loop(signal, true);
}
......@@ -1460,18 +1401,16 @@ Pgman::fswriteconf(Signal* signal, Ptr<Page_entry> ptr)
ndbrequire(m_stats.m_current_io_waits > 0);
m_stats.m_current_io_waits--;
if (ptr.p->m_copy_page_i != RNIL)
{
jam();
restore_copy_page(ptr);
state &= ~ Page_entry::COPY;
}
if (state & Page_entry::LCP)
{
ndbrequire(m_lcp_outstanding);
m_lcp_outstanding--;
state &= ~ Page_entry::LCP;
if (ptr.p->m_copy_page_i != RNIL)
{
process_lcp_locked_fswriteconf(signal, ptr);
}
}
set_page_state(ptr, state);
......@@ -1625,9 +1564,9 @@ Pgman::get_page(Signal* signal, Ptr<Page_entry> ptr, Page_request page_req)
! (req_flags & Page_request::UNLOCK_PAGE))
{
ptr.p->m_state |= (req_flags & DIRTY_FLAGS ? Page_entry::DIRTY : 0);
if (m_lcp_loop_on && ptr.p->m_copy_page_i != RNIL)
if (ptr.p->m_copy_page_i != RNIL)
{
return create_copy_page(ptr, req_flags);
return ptr.p->m_copy_page_i;
}
return ptr.p->m_real_page_i;
......
......@@ -305,7 +305,6 @@ private:
,PAGEIN = 0x0100 // paging in
,PAGEOUT = 0x0200 // paging out
,LOGSYNC = 0x0400 // undo WAL as part of pageout
,COPY = 0x0800 // Copy page for LCP
,LCP = 0x1000 // page is LCP flushed
,HOT = 0x2000 // page is hot
,ONSTACK = 0x4000 // page is on LIRS stack
......@@ -419,7 +418,6 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execCONTINUEB(Signal* signal);
void execLCP_PREPARE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*);
......@@ -462,9 +460,8 @@ private:
void move_cleanup_ptr(Ptr<Page_entry> ptr);
bool process_lcp(Signal*);
void process_lcp_prepare(Signal* signal, Ptr<Page_entry> ptr);
int create_copy_page(Ptr<Page_entry>, Uint32 req_flags);
void restore_copy_page(Ptr<Page_entry>);
void process_lcp_locked(Signal* signal, Ptr<Page_entry> ptr);
void process_lcp_locked_fswriteconf(Signal* signal, Ptr<Page_entry> ptr);
void pagein(Signal*, Ptr<Page_entry>);
void fsreadreq(Signal*, Ptr<Page_entry>);
......
......@@ -1777,7 +1777,7 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
/**
* Handling of unmapped extent header pages is not implemented
*/
int flags = 0;
int flags = Page_cache_client::DIRTY_REQ;
int real_page_id;
if ((real_page_id = m_page_cache_client.get_page(signal, preq, flags)) > 0)
{
......@@ -1805,31 +1805,20 @@ Tsman::restart_undo_page_free_bits(Signal* signal,
lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32;
lsn += page->m_page_header.m_page_lsn_lo;
if (undo_lsn <= lsn)
{
/**
* Toggle word
*/
if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x",
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
lsn = undo_lsn;
page->m_page_header.m_page_lsn_hi = lsn >> 32;
page->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
ndbassert((bits & ~(COMMITTED_MASK)) == 0);
header->update_free_bits(page_no_in_extent,
bits | (bits << UNCOMMITTED_SHIFT));
m_page_cache_client.update_lsn(preq.m_page, lsn);
}
else
/**
* Toggle word
*/
if (DBG_UNDO)
{
if (DBG_UNDO)
ndbout_c("tsman: apply %lld(%lld) %x -> %x",
undo_lsn, lsn, src, (bits | (bits << UNCOMMITTED_SHIFT)));
ndbout << "tsman: apply " << undo_lsn << "(" << lsn << ") "
<< *key << " " << (src & COMMITTED_MASK)
<< " -> " << bits << endl;
}
ndbassert((bits & ~(COMMITTED_MASK)) == 0);
header->update_free_bits(page_no_in_extent,
bits | (bits << UNCOMMITTED_SHIFT));
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment