ndb dd

  Fix a copule of dd bugs
    typo in lgman, potential run-time crash
    inconsistency in tup, potentially causing crash after SR
parent e29db067
...@@ -2632,7 +2632,7 @@ private: ...@@ -2632,7 +2632,7 @@ private:
void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused); void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
void disk_page_set_dirty(Ptr<Page>); void disk_page_set_dirty(Ptr<Page>);
void restart_setup_page(Ptr<Page>); void restart_setup_page(Disk_alloc_info&, Ptr<Page>);
void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>); void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
/** /**
......
...@@ -231,6 +231,16 @@ void ...@@ -231,6 +231,16 @@ void
Dbtup::update_extent_pos(Disk_alloc_info& alloc, Dbtup::update_extent_pos(Disk_alloc_info& alloc,
Ptr<Extent_info> extentPtr) Ptr<Extent_info> extentPtr)
{ {
#ifdef VM_TRACE
Uint32 min_free = 0;
for(Uint32 i = 0; i<MAX_FREE_LIST; i++)
{
Uint32 sum = alloc.calc_page_free_space(i);
min_free += sum * extentPtr.p->m_free_page_count[i];
}
ddassert(extentPtr.p->m_free_space >= min_free);
#endif
Uint32 old = extentPtr.p->m_free_matrix_pos; Uint32 old = extentPtr.p->m_free_matrix_pos;
if (old != RNIL) if (old != RNIL)
{ {
...@@ -252,7 +262,7 @@ Dbtup::update_extent_pos(Disk_alloc_info& alloc, ...@@ -252,7 +262,7 @@ Dbtup::update_extent_pos(Disk_alloc_info& alloc,
} }
void void
Dbtup::restart_setup_page(Ptr<Page> pagePtr) Dbtup::restart_setup_page(Disk_alloc_info& alloc, Ptr<Page> pagePtr)
{ {
/** /**
* Link to extent, clear uncommitted_used_space * Link to extent, clear uncommitted_used_space
...@@ -266,6 +276,17 @@ Dbtup::restart_setup_page(Ptr<Page> pagePtr) ...@@ -266,6 +276,17 @@ Dbtup::restart_setup_page(Ptr<Page> pagePtr)
Ptr<Extent_info> extentPtr; Ptr<Extent_info> extentPtr;
ndbrequire(c_extent_hash.find(extentPtr, key)); ndbrequire(c_extent_hash.find(extentPtr, key));
pagePtr.p->m_extent_info_ptr = extentPtr.i; pagePtr.p->m_extent_info_ptr = extentPtr.i;
Uint32 idx = pagePtr.p->list_index & ~0x8000;
Uint32 estimated = alloc.calc_page_free_space(idx);
Uint32 real_free = pagePtr.p->free_space;
ddassert(real_free >= estimated);
if (real_free != estimated)
{
extentPtr.p->m_free_space += (real_free - estimated);
update_extent_pos(alloc, extentPtr);
}
} }
...@@ -608,7 +629,7 @@ Dbtup::disk_page_prealloc_callback(Signal* signal, ...@@ -608,7 +629,7 @@ Dbtup::disk_page_prealloc_callback(Signal* signal,
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq)) if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{ {
restart_setup_page(pagePtr); restart_setup_page(fragPtr.p->m_disk_alloc_info, pagePtr);
} }
disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr); disk_page_prealloc_callback_common(signal, req, fragPtr, pagePtr);
...@@ -746,11 +767,6 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr) ...@@ -746,11 +767,6 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
return ; return ;
} }
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
restart_setup_page(pagePtr);
}
Local_key key; Local_key key;
key.m_page_no = pagePtr.p->m_page_no; key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no; key.m_file_no = pagePtr.p->m_file_no;
...@@ -772,6 +788,12 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr) ...@@ -772,6 +788,12 @@ Dbtup::disk_page_set_dirty(Ptr<Page> pagePtr)
getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p); getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info; Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
if (unlikely(pagePtr.p->m_restart_seq != globalData.m_restart_seq))
{
restart_setup_page(alloc, pagePtr);
}
Tablespace_client tsman(0, c_tsman, Tablespace_client tsman(0, c_tsman,
fragPtr.p->fragTableId, fragPtr.p->fragTableId,
fragPtr.p->fragmentId, fragPtr.p->fragmentId,
......
...@@ -1087,7 +1087,7 @@ Lgman::force_log_sync(Signal* signal, ...@@ -1087,7 +1087,7 @@ Lgman::force_log_sync(Signal* signal,
* Update free space with extra NOOP * Update free space with extra NOOP
*/ */
ndbrequire(ptr.p->m_free_file_words >= free); ndbrequire(ptr.p->m_free_file_words >= free);
ndbrequire(ptr.p->m_free_buffer_words >= free); ndbrequire(ptr.p->m_free_buffer_words > free);
ptr.p->m_free_file_words -= free; ptr.p->m_free_file_words -= free;
ptr.p->m_free_buffer_words -= free; ptr.p->m_free_buffer_words -= free;
...@@ -1171,7 +1171,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz) ...@@ -1171,7 +1171,7 @@ Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz)
{ {
next: next:
// fits this page wo/ problem // fits this page wo/ problem
ndbrequire(total_free >= sz); ndbrequire(total_free > sz);
ptr.p->m_free_buffer_words = total_free - sz; ptr.p->m_free_buffer_words = total_free - sz;
ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz; ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz;
return ((File_formats::Undofile::Undo_page*)page)->m_data + pos; return ((File_formats::Undofile::Undo_page*)page)->m_data + pos;
...@@ -1340,7 +1340,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force) ...@@ -1340,7 +1340,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx; Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx;
ndbout_c("force flush %d", free); ndbout_c("force flush %d %d", pos.m_idx, ptr.p->m_free_buffer_words);
ndbrequire(pos.m_idx); // don't flush empty page... ndbrequire(pos.m_idx); // don't flush empty page...
Uint64 lsn= ptr.p->m_last_lsn - 1; Uint64 lsn= ptr.p->m_last_lsn - 1;
...@@ -1355,7 +1355,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force) ...@@ -1355,7 +1355,7 @@ Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force)
* Update free space with extra NOOP * Update free space with extra NOOP
*/ */
ndbrequire(ptr.p->m_free_file_words >= free); ndbrequire(ptr.p->m_free_file_words >= free);
ndbrequire(ptr.p->m_free_buffer_words >= free); ndbrequire(ptr.p->m_free_buffer_words > free);
ptr.p->m_free_file_words -= free; ptr.p->m_free_file_words -= free;
ptr.p->m_free_buffer_words -= free; ptr.p->m_free_buffer_words -= free;
...@@ -1486,7 +1486,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr) ...@@ -1486,7 +1486,7 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
if(list.isEmpty()) if(list.isEmpty())
{ {
ptr.p->m_state &= (Uint32)Logfile_group::LG_WAITERS_THREAD; ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD;
return; return;
} }
...@@ -1661,6 +1661,7 @@ Lgman::execFSWRITECONF(Signal* signal) ...@@ -1661,6 +1661,7 @@ Lgman::execFSWRITECONF(Signal* signal)
{ {
Uint32 tot= 0; Uint32 tot= 0;
Uint64 lsn = 0; Uint64 lsn = 0;
{
LocalDLFifoList<Undofile> files(m_file_pool, lg_ptr.p->m_files); LocalDLFifoList<Undofile> files(m_file_pool, lg_ptr.p->m_files);
while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING)) while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING))
{ {
...@@ -1677,7 +1678,9 @@ Lgman::execFSWRITECONF(Signal* signal) ...@@ -1677,7 +1678,9 @@ Lgman::execFSWRITECONF(Signal* signal)
if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr)) if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr))
files.first(ptr); files.first(ptr);
} }
}
ndbassert(tot);
lg_ptr.p->m_outstanding_fs = cnt; lg_ptr.p->m_outstanding_fs = cnt;
lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS); lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS);
lg_ptr.p->m_next_reply_ptr_i = ptr.i; lg_ptr.p->m_next_reply_ptr_i = ptr.i;
...@@ -1693,6 +1696,10 @@ Lgman::execFSWRITECONF(Signal* signal) ...@@ -1693,6 +1696,10 @@ Lgman::execFSWRITECONF(Signal* signal)
process_log_buffer_waiters(signal, lg_ptr); process_log_buffer_waiters(signal, lg_ptr);
} }
} }
else
{
ndbout_c("miss matched writes");
}
return; return;
} }
...@@ -3004,8 +3011,10 @@ Lgman::execEND_LCP_CONF(Signal* signal) ...@@ -3004,8 +3011,10 @@ Lgman::execEND_LCP_CONF(Signal* signal)
void void
Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading) Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
{ {
do
{
if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL) if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL)
return; break;
Uint32 pages = compute_free_file_pages(ptr); Uint32 pages = compute_free_file_pages(ptr);
...@@ -3027,6 +3036,7 @@ Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading) ...@@ -3027,6 +3036,7 @@ Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading)
ndbrequire(pages >= group_pages); ndbrequire(pages >= group_pages);
} }
} while(0);
} }
#endif #endif
......
...@@ -990,7 +990,7 @@ static int copy_events(Ndb *ndb) ...@@ -990,7 +990,7 @@ static int copy_events(Ndb *ndb)
while ((pOp= ndb->nextEvent())) while ((pOp= ndb->nextEvent()))
{ {
char buf[1024]; char buf[1024];
sprintf(buf, "%s_SHADOW", pOp->getTable()->getName()); sprintf(buf, "%s_SHADOW", pOp->getEvent()->getTable()->getName());
const NdbDictionary::Table *table= dict->getTable(buf); const NdbDictionary::Table *table= dict->getTable(buf);
if (table == 0) if (table == 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment