Commit e29db067 authored by jonas@eel.(none)'s avatar jonas@eel.(none)

ndb dd -

  Fix possible deadlock when running with small log buffer, causing buffer to be full, but not page getting flushed
parent 4130141a
......@@ -175,7 +175,7 @@ Lgman::execCONTINUEB(Signal* signal){
jam();
Ptr<Logfile_group> ptr;
m_logfile_group_pool.getPtr(ptr, ptrI);
flush_log(signal, ptr);
flush_log(signal, ptr, signal->theData[2]);
return;
}
case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS:
......@@ -261,12 +261,25 @@ Lgman::execDUMP_STATE_ORD(Signal* signal){
while(!ptr.isNull())
{
infoEvent("lfg %d state: %x fs: %d lsn "
"[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d",
"[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d %d",
ptr.p->m_logfile_group_id, ptr.p->m_state,
ptr.p->m_outstanding_fs,
ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn,
ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn,
!ptr.p->m_log_buffer_waiters.isEmpty(),
!ptr.p->m_log_sync_waiters.isEmpty());
if (!ptr.p->m_log_buffer_waiters.isEmpty())
{
Uint32 free_buffer= ptr.p->m_free_buffer_words;
LocalDLFifoList<Log_waiter>
list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters);
Ptr<Log_waiter> waiter;
list.first(waiter);
infoEvent(" free_buffer_words: %d head(waiters).sz: %d %d",
ptr.p->m_free_buffer_words,
waiter.p->m_size,
2*File_formats::UNDO_PAGE_WORDS);
}
m_logfile_group_list.next(ptr);
}
}
......@@ -398,7 +411,7 @@ Lgman::drop_filegroup_drop_files(Signal* signal,
Uint32 ref, Uint32 data)
{
jam();
ndbassert(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK));
ndbrequire(ptr.p->m_meta_files.isEmpty());
ndbrequire(ptr.p->m_outstanding_fs == 0);
......@@ -696,7 +709,8 @@ Lgman::create_file_commit(Signal* signal,
lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = lg_ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
signal->theData[2] = 0;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
}
}
else
......@@ -1072,8 +1086,8 @@ Lgman::force_log_sync(Signal* signal,
/**
* Update free space with extra NOOP
*/
ndbassert(ptr.p->m_free_file_words >= free);
ndbassert(ptr.p->m_free_buffer_words >= free);
ndbrequire(ptr.p->m_free_file_words >= free);
ndbrequire(ptr.p->m_free_buffer_words >= free);
ptr.p->m_free_file_words -= free;
ptr.p->m_free_buffer_words -= free;
......@@ -1090,7 +1104,7 @@ Lgman::force_log_sync(Signal* signal,
last.p->m_sync_lsn > force_lsn &&
ptr.p->m_last_sync_req_lsn < last.p->m_sync_lsn)
{
ndbassert(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD);
signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC;
signal->theData[1] = ptr.i;
signal->theData[2] = last.p->m_sync_lsn >> 32;
......@@ -1157,7 +1171,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
{
next:
// fits this page wo/ problem
ndbassert(total_free >= sz);
ndbrequire(total_free >= sz);
ptr.p->m_free_buffer_words = total_free - sz;
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
......@@ -1176,7 +1190,7 @@ next:
/**
* Update free space with extra NOOP
*/
ndbassert(ptr.p->m_free_file_words >= free);
ndbrequire(ptr.p->m_free_file_words >= free);
ptr.p->m_free_file_words -= free;
validate_logfile_group(ptr, "get_log_buffer");
......@@ -1246,17 +1260,15 @@ Logfile_client::get_log_buffer(Signal* signal, Uint32 sz,
empty= list.isEmpty();
if(!list.seize(wait))
{
return -1;
}
wait.p->m_size= sz;
wait.p->m_block= m_block;
memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback));
}
if(empty)
{ // Start ContinueB
m_lgman->process_log_buffer_waiters(signal, ptr);
}
return 0;
}
return -1;
......@@ -1283,7 +1295,7 @@ operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos)
}
void
Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
{
Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER];
Logfile_group::Position producer= ptr.p->m_pos[PRODUCER];
......@@ -1292,26 +1304,75 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
if(consumer.m_current_page == producer.m_current_page)
{
#if 0
ndbout_c("ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
ptr.p->m_file_pos[HEAD].m_ptr_i);
ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
if (force)
{
ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x",
force, ptr.p->m_file_pos[HEAD].m_ptr_i);
ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d",
consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx,
producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx);
}
#endif
if (! (ptr.p->m_state & Logfile_group::LG_DROPPING))
{
jam();
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i;
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2);
if (ptr.p->m_log_buffer_waiters.isEmpty() || ptr.p->m_outstanding_fs)
{
force = 0;
}
if (force < 2)
{
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i;
signal->theData[2] = force + 1;
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
force ? 10 : 100, 3);
return;
}
else
{
Buffer_idx pos= producer.m_current_pos;
GlobalPage *page = m_global_page_pool.getPtr(pos.m_ptr_i);
Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
ndbout_c("force flush %d", free);
ndbrequire(pos.m_idx); // don't flush empty page...
Uint64 lsn= ptr.p->m_last_lsn - 1;
File_formats::Undofile::Undo_page* undo=
(File_formats::Undofile::Undo_page*)page;
undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF;
undo->m_page_header.m_page_lsn_hi = lsn >> 32;
undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free;
/**
* Update free space with extra NOOP
*/
ndbrequire(ptr.p->m_free_file_words >= free);
ndbrequire(ptr.p->m_free_buffer_words >= free);
ptr.p->m_free_file_words -= free;
ptr.p->m_free_buffer_words -= free;
validate_logfile_group(ptr, "force_log_flush");
next_page(ptr.p, PRODUCER);
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0;
producer = ptr.p->m_pos[PRODUCER];
// break through
}
}
else
{
jam();
ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
return;
}
return;
}
bool full= false;
......@@ -1339,7 +1400,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
else
{
// Only 1 chunk
ndbassert(ptr.p->m_buffer_pages.getSize() == 2);
ndbrequire(ptr.p->m_buffer_pages.getSize() == 2);
Uint32 tmp= consumer.m_current_page.m_idx + 1;
cnt= write_log_pages(signal, ptr, page, tmp);
assert(cnt <= tmp);
......@@ -1407,19 +1468,13 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr)
{
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
signal->theData[2] = 0;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
}
else
{
ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD;
}
if(tot > 0 && !ptr.p->m_log_buffer_waiters.isEmpty() &&
!(ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
{
jam();
process_log_buffer_waiters(signal, ptr);
}
}
void
......@@ -1457,7 +1512,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
}
else
{
ptr.p->m_state &= (Uint32)Logfile_group::LG_WAITERS_THREAD;
ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
}
}
......@@ -1593,14 +1648,14 @@ Lgman::execFSWRITECONF(Signal* signal)
Ptr<Undofile> ptr;
m_file_pool.getPtr(ptr, conf->userPointer);
ndbassert(ptr.p->m_state & Undofile::FS_OUTSTANDING);
ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
Ptr<Logfile_group> lg_ptr;
m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
Uint32 cnt= lg_ptr.p->m_outstanding_fs;
ndbassert(cnt);
ndbrequire(cnt);
if(lg_ptr.p->m_next_reply_ptr_i == ptr.i)
{
......@@ -1611,7 +1666,7 @@ Lgman::execFSWRITECONF(Signal* signal)
{
Uint32 state= ptr.p->m_state;
Uint32 pages= ptr.p->m_online.m_outstanding;
ndbassert(pages);
ndbrequire(pages);
ptr.p->m_online.m_outstanding= 0;
ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
tot += pages;
......@@ -1632,6 +1687,11 @@ Lgman::execFSWRITECONF(Signal* signal)
{
process_log_sync_waiters(signal, lg_ptr);
}
if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD))
{
process_log_buffer_waiters(signal, lg_ptr);
}
}
return;
......@@ -1679,7 +1739,7 @@ Lgman::execLCP_FRAG_ORD(Signal* signal)
undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
}
else
......@@ -1688,7 +1748,7 @@ Lgman::execLCP_FRAG_ORD(Signal* signal)
* dst++ = last_lsn >> 32;
* dst++ = last_lsn & 0xFFFFFFFF;
memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
}
ptr.p->m_last_lcp_lsn = last_lsn;
......@@ -1802,13 +1862,13 @@ Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr)
Ptr<Undofile> next = filePtr;
LocalDLFifoList<Undofile> files(m_file_pool, ptr.p->m_files);
while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY))
ndbassert(next.i != filePtr.i);
ndbrequire(next.i != filePtr.i);
if(next.isNull())
{
jam();
files.first(next);
while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next))
ndbassert(next.i != filePtr.i);
ndbrequire(next.i != filePtr.i);
}
tmp.m_idx= 0;
......@@ -1862,7 +1922,7 @@ Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal)
int
Lgman::alloc_log_space(Uint32 ref, Uint32 words)
{
ndbassert(words);
ndbrequire(words);
words += 2; // lsn
Logfile_group key;
key.m_logfile_group_id= ref;
......@@ -1886,7 +1946,7 @@ Lgman::alloc_log_space(Uint32 ref, Uint32 words)
int
Lgman::free_log_space(Uint32 ref, Uint32 words)
{
ndbassert(words);
ndbrequire(words);
Logfile_group key;
key.m_logfile_group_id= ref;
Ptr<Logfile_group> ptr;
......@@ -1896,7 +1956,7 @@ Lgman::free_log_space(Uint32 ref, Uint32 words)
validate_logfile_group(ptr, "free_log_space");
return 0;
}
ndbassert(false);
ndbrequire(false);
return -1;
}
......@@ -2031,7 +2091,7 @@ Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr)
* All files have read first page
* and m_files is sorted acording to lsn
*/
ndbassert(!ptr.p->m_files.isEmpty());
ndbrequire(!ptr.p->m_files.isEmpty());
LocalDLFifoList<Undofile> read_files(m_file_pool, ptr.p->m_files);
read_files.last(file_ptr);
......@@ -2080,11 +2140,11 @@ Lgman::execFSREADCONF(Signal* signal)
m_file_pool.getPtr(ptr, conf->userPointer);
m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i);
ndbassert(ptr.p->m_state & Undofile::FS_OUTSTANDING);
ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING);
ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING;
Uint32 cnt= lg_ptr.p->m_outstanding_fs;
ndbassert(cnt);
ndbrequire(cnt);
if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING)
{
......@@ -2098,7 +2158,7 @@ Lgman::execFSREADCONF(Signal* signal)
{
Uint32 state= ptr.p->m_state;
Uint32 pages= ptr.p->m_online.m_outstanding;
ndbassert(pages);
ndbrequire(pages);
ptr.p->m_online.m_outstanding= 0;
ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT;
tot += pages;
......@@ -2205,7 +2265,7 @@ Lgman::find_log_head_in_file(Signal* signal,
Uint32 head= ptr.p->m_file_pos[HEAD].m_idx;
Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx;
ndbassert(head > tail);
ndbrequire(head > tail);
Uint32 diff = head - tail;
if(DEBUG_SEARCH_LOG_HEAD)
......@@ -2442,23 +2502,23 @@ Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr)
{
Uint32 max=
producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1;
ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS);
ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
ndbassert(cnt <= max);
ndbrequire(cnt <= max);
producer.m_current_pos.m_ptr_i -= cnt;
producer.m_current_page.m_idx -= cnt;
}
else
{
Uint32 max= producer.m_current_page.m_idx;
ndbassert(free >= max * File_formats::UNDO_PAGE_WORDS);
ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS);
cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max);
ndbassert(cnt <= max);
ndbrequire(cnt <= max);
producer.m_current_pos.m_ptr_i -= cnt;
producer.m_current_page.m_idx -= cnt;
}
ndbassert(free >= cnt * File_formats::UNDO_PAGE_WORDS);
ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS);
free -= (cnt * File_formats::UNDO_PAGE_WORDS);
ptr.p->m_free_buffer_words = free;
ptr.p->m_pos[PRODUCER] = producer;
......@@ -2476,7 +2536,7 @@ Uint32
Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
Uint32 pageId, Uint32 pages)
{
ndbassert(pages);
ndbrequire(pages);
Ptr<Undofile> filePtr;
Buffer_idx tail= ptr.p->m_file_pos[TAIL];
m_file_pool.getPtr(filePtr, tail.m_ptr_i);
......@@ -2525,7 +2585,7 @@ Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr,
{
jam();
ndbassert(tail.m_idx - max == 0);
ndbrequire(tail.m_idx - max == 0);
req->varIndex = 1;
req->numberOfPages = max;
req->data.pageData[0] = pageId - max;
......@@ -2651,7 +2711,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
Uint32 *record= pageP->m_data + pos - 1;
Uint32 len= (* record) & 0xFFFF;
ndbassert(len);
ndbrequire(len);
Uint32 *prev= record - len;
Uint64 lsn = 0;
......@@ -2663,7 +2723,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
}
else
{
ndbassert(pos >= 3);
ndbrequire(pos >= 3);
lsn += * (prev - 1); lsn <<= 32;
lsn += * (prev - 0);
len += 2;
......@@ -2671,14 +2731,14 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
}
ndbassert(pos >= len);
ndbrequire(pos >= len);
if(pos == len)
{
/**
* Switching page
*/
ndbassert(producer.m_current_pos.m_idx);
ndbrequire(producer.m_current_pos.m_idx);
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --;
if(consumer.m_current_page.m_idx)
......@@ -2723,7 +2783,7 @@ Lgman::get_next_undo_record(Uint64 * this_lsn)
prev = pageP->m_data + pos - 1;
if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN)
ndbassert(lsn + 1 == ptr.p->m_last_read_lsn);
ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn);
ptr.p->m_pos[CONSUMER] = consumer;
ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS;
......@@ -2785,7 +2845,7 @@ Lgman::stop_run_undo_log(Signal* signal)
/**
* Fix log TAIL
*/
ndbassert(ptr.p->m_state == 0);
ndbrequire(ptr.p->m_state == 0);
ptr.p->m_state = Logfile_group::LG_ONLINE;
Buffer_idx tail= ptr.p->m_file_pos[TAIL];
Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx;
......@@ -2796,7 +2856,7 @@ Lgman::stop_run_undo_log(Signal* signal)
m_file_pool.getPtr(file, tail.m_ptr_i);
Uint32 page= tail.m_idx;
Uint32 size= file.p->m_file_size;
ndbassert(size >= page);
ndbrequire(size >= page);
Uint32 diff= size - page;
if(pages >= diff)
......@@ -2827,7 +2887,8 @@ Lgman::stop_run_undo_log(Signal* signal)
ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD;
signal->theData[0] = LgmanContinueB::FLUSH_LOG;
signal->theData[1] = ptr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB);
signal->theData[2] = 0;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
if(1)
{
......@@ -2914,7 +2975,7 @@ Lgman::execEND_LCP_CONF(Signal* signal)
undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16;
Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2);
memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2));
ptr.p->m_free_file_words -= (sizeof(undo) >> 2);
}
else
......@@ -2923,7 +2984,7 @@ Lgman::execEND_LCP_CONF(Signal* signal)
* dst++ = last_lsn >> 32;
* dst++ = last_lsn & 0xFFFFFFFF;
memcpy(dst, undo, sizeof(undo));
ndbassert(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2));
ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2);
}
m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1;
......
......@@ -258,7 +258,7 @@ private:
void endlcp_callback(Signal*, Uint32, Uint32);
void open_file(Signal*, Ptr<Undofile>, Uint32 requestInfo);
void flush_log(Signal*, Ptr<Logfile_group>);
void flush_log(Signal*, Ptr<Logfile_group>, Uint32 force);
Uint32 write_log_pages(Signal*, Ptr<Logfile_group>,
Uint32 pageId, Uint32 pages);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment