Commit 5d92e4d1 authored by unknown's avatar unknown

ndb dd -

  Fix possible deadlock when running with small log buffer, causing buffer to be full, but not page getting flushed


storage/ndb/src/kernel/blocks/lgman.cpp:
  Fix possible deadlock when running with small log buffer, causing buffer to be full, but not page getting flushed
storage/ndb/src/kernel/blocks/lgman.hpp:
  Fix possible deadlock when running with small log buffer, causing buffer to be full, but not page getting flushed
parent 5739f659
...@@ -175,7 +175,7 @@ Lgman::execCONTINUEB(Signal* signal){ ...@@ -175,7 +175,7 @@ Lgman::execCONTINUEB(Signal* signal){
jam(); jam();
Ptr<Logfile_group> ptr; Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI); m_logfile_group_pool.getPtr(ptr, ptrI);
flush_log(signal, ptr); flush_log(signal, ptr, signal->theData[2]);
return; return;
} }
case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS: case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
...@@ -261,12 +261,25 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){ ...@@ -261,12 +261,25 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
while(!ptr.isNull()) while(!ptr.isNull())
{ {
infoEvent("lfg %d state: %x fs: %d lsn " infoEvent("lfg %d state: %x fs: %d lsn "
"[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d", "[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d %d",
ptr.p->m_logfile_group_id, ptr.p->m_state, ptr.p->m_logfile_group_id, ptr.p->m_state,
ptr.p->m_outstanding_fs, ptr.p->m_outstanding_fs,
ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn, ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn, ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
!ptr.p->m_log_buffer_waiters.isEmpty(),
!ptr.p->m_log_sync_waiters.isEmpty()); !ptr.p->m_log_sync_waiters.isEmpty());
if (!ptr.p->m_log_buffer_waiters.isEmpty())
{
Uint32 free_buffer= ptr.p->m_free_buffer_words;
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
Ptr<Log_waiter> waiter;
list.first(waiter);
infoEvent(" free_buffer_words: %d head(waiters).sz: %d %d",
ptr.p->m_free_buffer_words,
waiter.p->m_size,
2*File_formats::UNDO_PAGE_WORDS);
}
m_logfile_group_list.next(ptr); m_logfile_group_list.next(ptr);
} }
} }
...@@ -398,7 +411,7 @@ Lgman::drop_filegroup_drop_files(Signal* signal, ...@@ -398,7 +411,7 @@ Lgman::drop_filegroup_drop_files(Signal* signal,
Uint32 ref, Uint32 data) Uint32 ref, Uint32 data)
{ {
jam(); jam();
ndbassert(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK)); ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
ndbrequire(ptr.p->m_meta_files.isEmpty()); ndbrequire(ptr.p->m_meta_files.isEmpty());
ndbrequire(ptr.p->m_outstanding_fs == 0); ndbrequire(ptr.p->m_outstanding_fs == 0);
...@@ -696,7 +709,8 @@ Lgman::create_file_commit(Signal* signal, ...@@ -696,7 +709,8 @@ Lgman::create_file_commit(Signal* signal,
lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD; lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = lg_ptr.i; signal->theData[1] = lg_ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); signal->theData[2] = 0;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
} }
} }
else else
...@@ -1072,8 +1086,8 @@ Lgman::force_log_sync(Signal* signal, ...@@ -1072,8 +1086,8 @@ Lgman::force_log_sync(Signal* signal,
/** /**
* Update free space with extra NOOP * Update free space with extra NOOP
*/ */
ndbassert(ptr.p->m_free_file_words >= free); ndbrequire(ptr.p->m_free_file_words >= free);
ndbassert(ptr.p->m_free_buffer_words >= free); ndbrequire(ptr.p->m_free_buffer_words >= free);
ptr.p->m_free_file_words -= free; ptr.p->m_free_file_words -= free;
ptr.p->m_free_buffer_words -= free; ptr.p->m_free_buffer_words -= free;
...@@ -1090,7 +1104,7 @@ Lgman::force_log_sync(Signal* signal, ...@@ -1090,7 +1104,7 @@ Lgman::force_log_sync(Signal* signal,
last.p->m_sync_lsn > force_lsn && last.p->m_sync_lsn > force_lsn &&
ptr.p->m_last_sync_req_lsn < last.p->m_sync_lsn) ptr.p->m_last_sync_req_lsn < last.p->m_sync_lsn)
{ {
ndbassert(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD); ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC; signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
signal->theData[1] = ptr.i; signal->theData[1] = ptr.i;
signal->theData[2] = last.p->m_sync_lsn >> 32; signal->theData[2] = last.p->m_sync_lsn >> 32;
...@@ -1157,7 +1171,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz) ...@@ -1157,7 +1171,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
{ {
next: next:
// fits this page wo/ problem // fits this page wo/ problem
ndbassert(total_free >= sz); ndbrequire(total_free >= sz);
ptr.p->m_free_buffer_words = total_free - sz; ptr.p->m_free_buffer_words = total_free - sz;
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz; ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
return ((File_formats::Undofile::Undo_page*)page)->m_data + pos; return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
...@@ -1176,7 +1190,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz) ...@@ -1176,7 +1190,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
/** /**
* Update free space with extra NOOP * Update free space with extra NOOP
*/ */
ndbassert(ptr.p->m_free_file_words >= free); ndbrequire(ptr.p->m_free_file_words >= free);
ptr.p->m_free_file_words -= free; ptr.p->m_free_file_words -= free;
validate_logfile_group(ptr, "get_log_buffer"); validate_logfile_group(ptr, "get_log_buffer");
...@@ -1246,17 +1260,15 @@ Logfile_client::get_log_buffer(Signal* signal, Uint32 sz, ...@@ -1246,17 +1260,15 @@ Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
empty= list.isEmpty(); empty= list.isEmpty();
if(!list.seize(wait)) if(!list.seize(wait))
{
return -1; return -1;
}
wait.p->m_size= sz; wait.p->m_size= sz;
wait.p->m_block= m_block; wait.p->m_block= m_block;
memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback)); memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback));
} }
if(empty)
{ // Start ContinueB
m_lgman->process_log_buffer_waiters(signal, ptr);
}
return 0; return 0;
} }
return -1; return -1;
...@@ -1283,7 +1295,7 @@ operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos) ...@@ -1283,7 +1295,7 @@ operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
} }
void void
Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr) Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
{ {
Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER]; Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
Logfile_group::Position producer= ptr.p->m_pos[PRODUCER]; Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
...@@ -1292,26 +1304,75 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1292,26 +1304,75 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
if(consumer.m_current_page == producer.m_current_page) if(consumer.m_current_page == producer.m_current_page)
{ {
#if 0 #if 0
ndbout_c("ptr.p->m_file_pos[HEAD].m_ptr_i= %x", if (force)
ptr.p->m_file_pos[HEAD].m_ptr_i); {
ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d", ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx, force, ptr.p->m_file_pos[HEAD].m_ptr_i);
producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx); ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
}
#endif #endif
if (! (ptr.p->m_state & Logfile_group::LG_DROPPING)) if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
{ {
jam(); jam();
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i; if (ptr.p->m_log_buffer_waiters.isEmpty() || ptr.p->m_outstanding_fs)
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2); {
force = 0;
}
if (force < 2)
{
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i;
signal->theData[2] = force + 1;
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
force ? 10 : 100, 3);
return;
}
else
{
Buffer_idx pos= producer.m_current_pos;
GlobalPage *page = m_global_page_pool.getPtr(pos.m_ptr_i);
Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
ndbout_c("force flush %d", free);
ndbrequire(pos.m_idx); // don't flush empty page...
Uint64 lsn= ptr.p->m_last_lsn - 1;
File_formats::Undofile::Undo_page* undo=
(File_formats::Undofile::Undo_page*)page;
undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
undo->m_page_header.m_page_lsn_hi = lsn >> 32;
undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
/**
* Update free space with extra NOOP
*/
ndbrequire(ptr.p->m_free_file_words >= free);
ndbrequire(ptr.p->m_free_buffer_words >= free);
ptr.p->m_free_file_words -= free;
ptr.p->m_free_buffer_words -= free;
validate_logfile_group(ptr, "force_log_flush");
next_page(ptr.p, PRODUCER);
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
producer = ptr.p->m_pos[PRODUCER];
// break through
}
} }
else else
{ {
jam(); jam();
ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD; ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
return;
} }
return;
} }
bool full= false; bool full= false;
...@@ -1339,7 +1400,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1339,7 +1400,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
else else
{ {
// Only 1 chunk // Only 1 chunk
ndbassert(ptr.p->m_buffer_pages.getSize() == 2); ndbrequire(ptr.p->m_buffer_pages.getSize() == 2);
Uint32 tmp= consumer.m_current_page.m_idx + 1; Uint32 tmp= consumer.m_current_page.m_idx + 1;
cnt= write_log_pages(signal, ptr, page, tmp); cnt= write_log_pages(signal, ptr, page, tmp);
assert(cnt <= tmp); assert(cnt <= tmp);
...@@ -1407,19 +1468,13 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1407,19 +1468,13 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
{ {
signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i; signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); signal->theData[2] = 0;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
} }
else else
{ {
ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD; ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
} }
if(tot > 0 && !ptr.p->m_log_buffer_waiters.isEmpty() &&
!(ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
{
jam();
process_log_buffer_waiters(signal, ptr);
}
} }
void void
...@@ -1457,7 +1512,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1457,7 +1512,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
} }
else else
{ {
ptr.p->m_state &= (Uint32)Logfile_group::LG_WAITERS_THREAD; ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
} }
} }
...@@ -1593,14 +1648,14 @@ Lgman::execFSWRITECONF(Signal* signal) ...@@ -1593,14 +1648,14 @@ Lgman::execFSWRITECONF(Signal* signal)
Ptr<Undofile> ptr; Ptr<Undofile> ptr;
m_file_pool.getPtr(ptr, conf->userPointer); m_file_pool.getPtr(ptr, conf->userPointer);
ndbassert(ptr.p->m_state & Undofile::FS_OUTSTANDING); ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING; ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
Ptr<Logfile_group> lg_ptr; Ptr<Logfile_group> lg_ptr;
m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i); m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
Uint32 cnt= lg_ptr.p->m_outstanding_fs; Uint32 cnt= lg_ptr.p->m_outstanding_fs;
ndbassert(cnt); ndbrequire(cnt);
if(lg_ptr.p->m_next_reply_ptr_i == ptr.i) if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
{ {
...@@ -1611,7 +1666,7 @@ Lgman::execFSWRITECONF(Signal* signal) ...@@ -1611,7 +1666,7 @@ Lgman::execFSWRITECONF(Signal* signal)
{ {
Uint32 state= ptr.p->m_state; Uint32 state= ptr.p->m_state;
Uint32 pages= ptr.p->m_online.m_outstanding; Uint32 pages= ptr.p->m_online.m_outstanding;
ndbassert(pages); ndbrequire(pages);
ptr.p->m_online.m_outstanding= 0; ptr.p->m_online.m_outstanding= 0;
ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT; ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
tot += pages; tot += pages;
...@@ -1632,6 +1687,11 @@ Lgman::execFSWRITECONF(Signal* signal) ...@@ -1632,6 +1687,11 @@ Lgman::execFSWRITECONF(Signal* signal)
{ {
process_log_sync_waiters(signal, lg_ptr); process_log_sync_waiters(signal, lg_ptr);
} }
if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
{
process_log_buffer_waiters(signal, lg_ptr);
}
} }
return; return;
...@@ -1679,7 +1739,7 @@ Lgman::execLCP_FRAG_ORD(Signal* signal) ...@@ -1679,7 +1739,7 @@ Lgman::execLCP_FRAG_ORD(Signal* signal)
undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16; undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2); Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
memcpy(dst, undo, sizeof(undo)); memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2)); ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ptr.p->m_free_file_words -= (sizeof(undo) >> 2); ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
} }
else else
...@@ -1688,7 +1748,7 @@ Lgman::execLCP_FRAG_ORD(Signal* signal) ...@@ -1688,7 +1748,7 @@ Lgman::execLCP_FRAG_ORD(Signal* signal)
* dst++ = last_lsn >> 32; * dst++ = last_lsn >> 32;
* dst++ = last_lsn & 0xFFFFFFFF; * dst++ = last_lsn & 0xFFFFFFFF;
memcpy(dst, undo, sizeof(undo)); memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2)); ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2); ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
} }
ptr.p->m_last_lcp_lsn = last_lsn; ptr.p->m_last_lcp_lsn = last_lsn;
...@@ -1802,13 +1862,13 @@ Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1802,13 +1862,13 @@ Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
Ptr<Undofile> next = filePtr; Ptr<Undofile> next = filePtr;
LocalDLFifoList<Undofile> files(m_file_pool, ptr.p->m_files); LocalDLFifoList<Undofile> files(m_file_pool, ptr.p->m_files);
while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY)) while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
ndbassert(next.i != filePtr.i); ndbrequire(next.i != filePtr.i);
if(next.isNull()) if(next.isNull())
{ {
jam(); jam();
files.first(next); files.first(next);
while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next)) while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
ndbassert(next.i != filePtr.i); ndbrequire(next.i != filePtr.i);
} }
tmp.m_idx= 0; tmp.m_idx= 0;
...@@ -1862,7 +1922,7 @@ Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal) ...@@ -1862,7 +1922,7 @@ Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal)
int int
Lgman::alloc_log_space(Uint32 ref, Uint32 words) Lgman::alloc_log_space(Uint32 ref, Uint32 words)
{ {
ndbassert(words); ndbrequire(words);
words += 2; // lsn words += 2; // lsn
Logfile_group key; Logfile_group key;
key.m_logfile_group_id= ref; key.m_logfile_group_id= ref;
...@@ -1886,7 +1946,7 @@ Lgman::alloc_log_space(Uint32 ref, Uint32 words) ...@@ -1886,7 +1946,7 @@ Lgman::alloc_log_space(Uint32 ref, Uint32 words)
int int
Lgman::free_log_space(Uint32 ref, Uint32 words) Lgman::free_log_space(Uint32 ref, Uint32 words)
{ {
ndbassert(words); ndbrequire(words);
Logfile_group key; Logfile_group key;
key.m_logfile_group_id= ref; key.m_logfile_group_id= ref;
Ptr<Logfile_group> ptr; Ptr<Logfile_group> ptr;
...@@ -1896,7 +1956,7 @@ Lgman::free_log_space(Uint32 ref, Uint32 words) ...@@ -1896,7 +1956,7 @@ Lgman::free_log_space(Uint32 ref, Uint32 words)
validate_logfile_group(ptr, "free_log_space"); validate_logfile_group(ptr, "free_log_space");
return 0; return 0;
} }
ndbassert(false); ndbrequire(false);
return -1; return -1;
} }
...@@ -2031,7 +2091,7 @@ Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -2031,7 +2091,7 @@ Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
* All files have read first page * All files have read first page
* and m_files is sorted acording to lsn * and m_files is sorted acording to lsn
*/ */
ndbassert(!ptr.p->m_files.isEmpty()); ndbrequire(!ptr.p->m_files.isEmpty());
LocalDLFifoList<Undofile> read_files(m_file_pool, ptr.p->m_files); LocalDLFifoList<Undofile> read_files(m_file_pool, ptr.p->m_files);
read_files.last(file_ptr); read_files.last(file_ptr);
...@@ -2080,11 +2140,11 @@ Lgman::execFSREADCONF(Signal* signal) ...@@ -2080,11 +2140,11 @@ Lgman::execFSREADCONF(Signal* signal)
m_file_pool.getPtr(ptr, conf->userPointer); m_file_pool.getPtr(ptr, conf->userPointer);
m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i); m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
ndbassert(ptr.p->m_state & Undofile::FS_OUTSTANDING); ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING; ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
Uint32 cnt= lg_ptr.p->m_outstanding_fs; Uint32 cnt= lg_ptr.p->m_outstanding_fs;
ndbassert(cnt); ndbrequire(cnt);
if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING) if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
{ {
...@@ -2098,7 +2158,7 @@ Lgman::execFSREADCONF(Signal* signal) ...@@ -2098,7 +2158,7 @@ Lgman::execFSREADCONF(Signal* signal)
{ {
Uint32 state= ptr.p->m_state; Uint32 state= ptr.p->m_state;
Uint32 pages= ptr.p->m_online.m_outstanding; Uint32 pages= ptr.p->m_online.m_outstanding;
ndbassert(pages); ndbrequire(pages);
ptr.p->m_online.m_outstanding= 0; ptr.p->m_online.m_outstanding= 0;
ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT; ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
tot += pages; tot += pages;
...@@ -2205,7 +2265,7 @@ Lgman::find_log_head_in_file(Signal* signal, ...@@ -2205,7 +2265,7 @@ Lgman::find_log_head_in_file(Signal* signal,
Uint32 head= ptr.p->m_file_pos[HEAD].m_idx; Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx; Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
ndbassert(head > tail); ndbrequire(head > tail);
Uint32 diff = head - tail; Uint32 diff = head - tail;
if(DEBUG_SEARCH_LOG_HEAD) if(DEBUG_SEARCH_LOG_HEAD)
...@@ -2442,23 +2502,23 @@ Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -2442,23 +2502,23 @@ Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
{ {
Uint32 max= Uint32 max=
producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1; producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS); ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max); cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
ndbassert(cnt <= max); ndbrequire(cnt <= max);
producer.m_current_pos.m_ptr_i -= cnt; producer.m_current_pos.m_ptr_i -= cnt;
producer.m_current_page.m_idx -= cnt; producer.m_current_page.m_idx -= cnt;
} }
else else
{ {
Uint32 max= producer.m_current_page.m_idx; Uint32 max= producer.m_current_page.m_idx;
ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS); ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max); cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
ndbassert(cnt <= max); ndbrequire(cnt <= max);
producer.m_current_pos.m_ptr_i -= cnt; producer.m_current_pos.m_ptr_i -= cnt;
producer.m_current_page.m_idx -= cnt; producer.m_current_page.m_idx -= cnt;
} }
ndbassert(free >= cnt * File_formats::UNDO_PAGE_WORDS); ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
free -= (cnt * File_formats::UNDO_PAGE_WORDS); free -= (cnt * File_formats::UNDO_PAGE_WORDS);
ptr.p->m_free_buffer_words = free; ptr.p->m_free_buffer_words = free;
ptr.p->m_pos[PRODUCER] = producer; ptr.p->m_pos[PRODUCER] = producer;
...@@ -2476,7 +2536,7 @@ Uint32 ...@@ -2476,7 +2536,7 @@ Uint32
Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr, Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
Uint32 pageId, Uint32 pages) Uint32 pageId, Uint32 pages)
{ {
ndbassert(pages); ndbrequire(pages);
Ptr<Undofile> filePtr; Ptr<Undofile> filePtr;
Buffer_idx tail= ptr.p->m_file_pos[TAIL]; Buffer_idx tail= ptr.p->m_file_pos[TAIL];
m_file_pool.getPtr(filePtr, tail.m_ptr_i); m_file_pool.getPtr(filePtr, tail.m_ptr_i);
...@@ -2525,7 +2585,7 @@ Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr, ...@@ -2525,7 +2585,7 @@ Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
{ {
jam(); jam();
ndbassert(tail.m_idx - max == 0); ndbrequire(tail.m_idx - max == 0);
req->varIndex = 1; req->varIndex = 1;
req->numberOfPages = max; req->numberOfPages = max;
req->data.pageData[0] = pageId - max; req->data.pageData[0] = pageId - max;
...@@ -2651,7 +2711,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn) ...@@ -2651,7 +2711,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
Uint32 *record= pageP->m_data + pos - 1; Uint32 *record= pageP->m_data + pos - 1;
Uint32 len= (* record) & 0xFFFF; Uint32 len= (* record) & 0xFFFF;
ndbassert(len); ndbrequire(len);
Uint32 *prev= record - len; Uint32 *prev= record - len;
Uint64 lsn = 0; Uint64 lsn = 0;
...@@ -2663,7 +2723,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn) ...@@ -2663,7 +2723,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
} }
else else
{ {
ndbassert(pos >= 3); ndbrequire(pos >= 3);
lsn += * (prev - 1); lsn <<= 32; lsn += * (prev - 1); lsn <<= 32;
lsn += * (prev - 0); lsn += * (prev - 0);
len += 2; len += 2;
...@@ -2671,14 +2731,14 @@ Lgman::get_next_undo_record(Uint64 * this_lsn) ...@@ -2671,14 +2731,14 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
} }
ndbassert(pos >= len); ndbrequire(pos >= len);
if(pos == len) if(pos == len)
{ {
/** /**
* Switching page * Switching page
*/ */
ndbassert(producer.m_current_pos.m_idx); ndbrequire(producer.m_current_pos.m_idx);
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --; ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
if(consumer.m_current_page.m_idx) if(consumer.m_current_page.m_idx)
...@@ -2723,7 +2783,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn) ...@@ -2723,7 +2783,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
prev = pageP->m_data + pos - 1; prev = pageP->m_data + pos - 1;
if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN) if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
ndbassert(lsn + 1 == ptr.p->m_last_read_lsn); ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
ptr.p->m_pos[CONSUMER] = consumer; ptr.p->m_pos[CONSUMER] = consumer;
ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS; ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
...@@ -2785,7 +2845,7 @@ Lgman::stop_run_undo_log(Signal* signal) ...@@ -2785,7 +2845,7 @@ Lgman::stop_run_undo_log(Signal* signal)
/** /**
* Fix log TAIL * Fix log TAIL
*/ */
ndbassert(ptr.p->m_state == 0); ndbrequire(ptr.p->m_state == 0);
ptr.p->m_state = Logfile_group::LG_ONLINE; ptr.p->m_state = Logfile_group::LG_ONLINE;
Buffer_idx tail= ptr.p->m_file_pos[TAIL]; Buffer_idx tail= ptr.p->m_file_pos[TAIL];
Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx; Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
...@@ -2796,7 +2856,7 @@ Lgman::stop_run_undo_log(Signal* signal) ...@@ -2796,7 +2856,7 @@ Lgman::stop_run_undo_log(Signal* signal)
m_file_pool.getPtr(file, tail.m_ptr_i); m_file_pool.getPtr(file, tail.m_ptr_i);
Uint32 page= tail.m_idx; Uint32 page= tail.m_idx;
Uint32 size= file.p->m_file_size; Uint32 size= file.p->m_file_size;
ndbassert(size >= page); ndbrequire(size >= page);
Uint32 diff= size - page; Uint32 diff= size - page;
if(pages >= diff) if(pages >= diff)
...@@ -2827,7 +2887,8 @@ Lgman::stop_run_undo_log(Signal* signal) ...@@ -2827,7 +2887,8 @@ Lgman::stop_run_undo_log(Signal* signal)
ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD; ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i; signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); signal->theData[2] = 0;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
if(1) if(1)
{ {
...@@ -2914,7 +2975,7 @@ Lgman::execEND_LCP_CONF(Signal* signal) ...@@ -2914,7 +2975,7 @@ Lgman::execEND_LCP_CONF(Signal* signal)
undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16; undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2); Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
memcpy(dst, undo, sizeof(undo)); memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2)); ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ptr.p->m_free_file_words -= (sizeof(undo) >> 2); ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
} }
else else
...@@ -2923,7 +2984,7 @@ Lgman::execEND_LCP_CONF(Signal* signal) ...@@ -2923,7 +2984,7 @@ Lgman::execEND_LCP_CONF(Signal* signal)
* dst++ = last_lsn >> 32; * dst++ = last_lsn >> 32;
* dst++ = last_lsn & 0xFFFFFFFF; * dst++ = last_lsn & 0xFFFFFFFF;
memcpy(dst, undo, sizeof(undo)); memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2)); ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2); ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
} }
m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1; m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
......
...@@ -258,7 +258,7 @@ private: ...@@ -258,7 +258,7 @@ private:
void endlcp_callback(Signal*, Uint32, Uint32); void endlcp_callback(Signal*, Uint32, Uint32);
void open_file(Signal*, Ptr<Undofile>, Uint32 requestInfo); void open_file(Signal*, Ptr<Undofile>, Uint32 requestInfo);
void flush_log(Signal*, Ptr<Logfile_group>); void flush_log(Signal*, Ptr<Logfile_group>, Uint32 force);
Uint32 write_log_pages(Signal*, Ptr<Logfile_group>, Uint32 write_log_pages(Signal*, Ptr<Logfile_group>,
Uint32 pageId, Uint32 pages); Uint32 pageId, Uint32 pages);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment