ndb - bug#21948 & bug#17605

  fix alloc/free extent in undo log
  allow extent to be reused once a lcp is finished (instead of when next lcp starts)
parent 4456144c
......@@ -74,6 +74,8 @@ struct FreeExtentReq {
Local_key key;
Uint32 table_id;
Uint32 tablespace_id;
Uint32 lsn_hi;
Uint32 lsn_lo;
} request;
struct
{
......
......@@ -11276,7 +11276,7 @@ void Dblqh::execLCP_PREPARE_REF(Signal* signal)
/**
* First fragment mean that last LCP is complete :-)
*/
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
}
......@@ -11327,7 +11327,7 @@ void Dblqh::execLCP_PREPARE_CONF(Signal* signal)
/**
* First fragment mean that last LCP is complete :-)
*/
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, signal->length());
EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
jamEntry();
}
......@@ -11611,6 +11611,9 @@ void Dblqh::completeLcpRoundLab(Signal* signal, Uint32 lcpId)
sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength);
jamEntry();
lcpPtr.i = 0;
ptrAss(lcpPtr, lcpRecord);
lcpPtr.p->m_outstanding = 3;
......
......@@ -625,7 +625,8 @@ struct Fragrecord {
DLList<ScanOp>::Head m_scanList;
bool m_undo_complete;
enum { UC_LCP = 1, UC_CREATE = 2 };
Uint32 m_undo_complete;
Uint32 m_tablespace_id;
Uint32 m_logfile_group_id;
Disk_alloc_info m_disk_alloc_info;
......@@ -989,6 +990,9 @@ ArrayPool<TupTriggerData> c_triggerPool;
,UNDO_UPDATE = File_formats::Undofile::UNDO_TUP_UPDATE
,UNDO_FREE = File_formats::Undofile::UNDO_TUP_FREE
,UNDO_CREATE = File_formats::Undofile::UNDO_TUP_CREATE
,UNDO_DROP = File_formats::Undofile::UNDO_TUP_DROP
,UNDO_ALLOC_EXTENT = File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT
,UNDO_FREE_EXTENT = File_formats::Undofile::UNDO_TUP_FREE_EXTENT
};
struct Alloc
......@@ -1021,6 +1025,30 @@ ArrayPool<TupTriggerData> c_triggerPool;
Uint32 m_table;
Uint32 m_type_length; // 16 bit type, 16 bit length
};
struct Drop
{
Uint32 m_table;
Uint32 m_type_length; // 16 bit type, 16 bit length
};
struct AllocExtent
{
Uint32 m_table;
Uint32 m_fragment;
Uint32 m_page_no;
Uint32 m_file_no;
Uint32 m_type_length;
};
struct FreeExtent
{
Uint32 m_table;
Uint32 m_fragment;
Uint32 m_page_no;
Uint32 m_file_no;
Uint32 m_type_length;
};
};
Extent_info_pool c_extent_pool;
......@@ -1420,7 +1448,7 @@ public:
int nr_delete(Signal*, Uint32, Uint32 fragPtr, const Local_key*, Uint32 gci);
void nr_delete_page_callback(Signal*, Uint32 op, Uint32 page);
void nr_delete_logbuffer_callback(Signal*, Uint32 op, Uint32 page);
void nr_delete_log_buffer_callback(Signal*, Uint32 op, Uint32 page);
private:
BLOCK_DEFINES(Dbtup);
......@@ -2345,9 +2373,10 @@ private:
Uint32 fragId);
void releaseFragment(Signal* signal, Uint32 tableId);
void releaseFragment(Signal* signal, Uint32 tableId, Uint32);
void drop_fragment_free_var_pages(Signal*);
void drop_fragment_free_exent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
void drop_fragment_free_extent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
void drop_fragment_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
void drop_fragment_unmap_pages(Signal*, TablerecPtr, FragrecordPtr, Uint32);
void drop_fragment_unmap_page_callback(Signal* signal, Uint32, Uint32);
......@@ -2632,6 +2661,9 @@ private:
void disk_page_log_buffer_callback(Signal*, Uint32 opPtrI, Uint32);
void disk_page_alloc_extent_log_buffer_callback(Signal*, Uint32, Uint32);
void disk_page_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
Uint64 disk_page_undo_alloc(Page*, const Local_key*,
Uint32 sz, Uint32 gci, Uint32 logfile_group_id);
......@@ -2646,6 +2678,9 @@ private:
void undo_createtable_callback(Signal* signal, Uint32 opPtrI, Uint32 unused);
void undo_createtable_logsync_callback(Signal* signal, Uint32, Uint32);
void drop_table_log_buffer_callback(Signal*, Uint32, Uint32);
void drop_table_logsync_callback(Signal*, Uint32, Uint32);
void disk_page_set_dirty(Ptr<Page>);
void restart_setup_page(Disk_alloc_info&, Ptr<Page>);
void update_extent_pos(Disk_alloc_info&, Ptr<Extent_info>);
......@@ -2679,7 +2714,7 @@ public:
private:
void disk_restart_undo_next(Signal*);
void disk_restart_undo_lcp(Uint32, Uint32);
void disk_restart_undo_lcp(Uint32, Uint32, Uint32 flag);
void disk_restart_undo_callback(Signal* signal, Uint32, Uint32);
void disk_restart_undo_alloc(Apply_undo*);
void disk_restart_undo_update(Apply_undo*);
......
......@@ -68,6 +68,26 @@ operator<<(NdbOut& out, const Ptr<Dbtup::Extent_info> & ptr)
return out;
}
#if NOT_YET_FREE_EXTENT
static
inline
bool
check_free(const Dbtup::Extent_info* extP)
{
Uint32 res = 0;
for (Uint32 i = 1; i<MAX_FREE_LIST; i++)
res += extP->m_free_page_count[i];
return res;
}
#error "Code for deallocting extents when they get empty"
#error "This code is not yet complete"
#endif
#if NOT_YET_UNDO_ALLOC_EXTENT
#error "This is needed for deallocting extents when they get empty"
#error "This code is not complete yet"
#endif
void
Dbtup::dump_disk_alloc(Dbtup::Disk_alloc_info & alloc)
{
......@@ -441,10 +461,26 @@ Dbtup::disk_page_prealloc(Signal* signal,
/**
* We need to alloc an extent
*/
#if NOT_YET_UNDO_ALLOC_EXTENT
Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id;
err = c_lgman->alloc_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
if(unlikely(err))
{
return -err;
}
#endif
if (!c_extent_pool.seize(ext))
{
jam();
//XXX
err= 2;
#if NOT_YET_UNDO_ALLOC_EXTENT
c_lgman->free_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
#endif
c_page_request_pool.release(req);
ndbout_c("no free extent info");
return -err;
......@@ -452,12 +488,44 @@ Dbtup::disk_page_prealloc(Signal* signal,
if ((err= tsman.alloc_extent(&ext.p->m_key)) < 0)
{
jam();
#if NOT_YET_UNDO_ALLOC_EXTENT
c_lgman->free_log_space(logfile_group_id,
sizeof(Disk_undo::AllocExtent)>>2);
#endif
c_extent_pool.release(ext);
c_page_request_pool.release(req);
return err;
}
int pages= err;
#if NOT_YET_UNDO_ALLOC_EXTENT
{
/**
* Do something here
*/
{
Callback cb;
cb.m_callbackData= ext.i;
cb.m_callbackFunction =
safe_cast(&Dbtup::disk_page_alloc_extent_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::AllocExtent)>>2;
Logfile_client lgman(this, c_lgman, logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
case 0:
break;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
default:
execute(signal, cb, res);
}
}
}
#endif
ndbout << "allocated " << pages << " pages: " << ext.p->m_key << endl;
ext.p->m_first_page_no = ext.p->m_key.m_page_no;
bzero(ext.p->m_free_page_count, sizeof(ext.p->m_free_page_count));
......@@ -1007,6 +1075,12 @@ Dbtup::disk_page_free(Signal *signal,
extentPtr.p->m_free_space += sz;
update_extent_pos(alloc, extentPtr);
#if NOT_YET_FREE_EXTENT
if (check_free(extentPtr.p) == 0)
{
ndbout_c("free: extent is free");
}
#endif
}
void
......@@ -1104,13 +1178,55 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
extentPtr.p->m_free_space += sz;
update_extent_pos(alloc, extentPtr);
#if NOT_YET_FREE_EXTENT
if (check_free(extentPtr.p) == 0)
{
ndbout_c("abort: extent is free");
}
#endif
}
#if NOT_YET_UNDO_ALLOC_EXTENT
void
Dbtup::disk_page_alloc_extent_log_buffer_callback(Signal* signal,
Uint32 extentPtrI,
Uint32 unused)
{
Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, extentPtrI);
Local_key key = extentPtr.p->m_key;
Tablespace_client2 tsman(signal, c_tsman, &key);
Ptr<Tablerec> tabPtr;
tabPtr.i= tsman.m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
Ptr<Fragrecord> fragPtr;
getFragmentrec(fragPtr, tsman.m_fragment_id, tabPtr.p);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
Disk_undo::AllocExtent alloc;
alloc.m_table = tabPtr.i;
alloc.m_fragment = tsman.m_fragment_id;
alloc.m_page_no = key.m_page_no;
alloc.m_file_no = key.m_file_no;
alloc.m_type_length = (Disk_undo::UNDO_ALLOC_EXTENT<<16)|(sizeof(alloc)>> 2);
Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
Uint64 lsn= lgman.add_entry(c, 1);
tsman.update_lsn(&key, lsn);
}
#endif
Uint64
Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
Uint32 sz, Uint32 gci, Uint32 logfile_group_id)
{
Logfile_client lsman(this, c_lgman, logfile_group_id);
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Alloc alloc;
alloc.m_type_length= (Disk_undo::UNDO_ALLOC << 16) | (sizeof(alloc) >> 2);
......@@ -1119,7 +1235,7 @@ Dbtup::disk_page_undo_alloc(Page* page, const Local_key* key,
Logfile_client::Change c[1] = {{ &alloc, sizeof(alloc) >> 2 } };
Uint64 lsn= lsman.add_entry(c, 1);
Uint64 lsn= lgman.add_entry(c, 1);
m_pgman.update_lsn(* key, lsn);
return lsn;
......@@ -1130,7 +1246,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
const Uint32* src, Uint32 sz,
Uint32 gci, Uint32 logfile_group_id)
{
Logfile_client lsman(this, c_lgman, logfile_group_id);
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Update update;
update.m_page_no = key->m_page_no;
......@@ -1148,7 +1264,7 @@ Dbtup::disk_page_undo_update(Page* page, const Local_key* key,
ndbassert(4*(3 + sz + 1) == (sizeof(update) + 4*sz - 4));
Uint64 lsn= lsman.add_entry(c, 3);
Uint64 lsn= lgman.add_entry(c, 3);
m_pgman.update_lsn(* key, lsn);
return lsn;
......@@ -1159,7 +1275,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
const Uint32* src, Uint32 sz,
Uint32 gci, Uint32 logfile_group_id)
{
Logfile_client lsman(this, c_lgman, logfile_group_id);
Logfile_client lgman(this, c_lgman, logfile_group_id);
Disk_undo::Free free;
free.m_page_no = key->m_page_no;
......@@ -1177,7 +1293,7 @@ Dbtup::disk_page_undo_free(Page* page, const Local_key* key,
ndbassert(4*(3 + sz + 1) == (sizeof(free) + 4*sz - 4));
Uint64 lsn= lsman.add_entry(c, 3);
Uint64 lsn= lgman.add_entry(c, 3);
m_pgman.update_lsn(* key, lsn);
return lsn;
......@@ -1207,7 +1323,7 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
ndbrequire(len == 3);
Uint32 tableId = ptr[1] >> 16;
Uint32 fragId = ptr[1] & 0xFFFF;
disk_restart_undo_lcp(tableId, fragId);
disk_restart_undo_lcp(tableId, fragId, Fragrecord::UC_LCP);
disk_restart_undo_next(signal);
return;
}
......@@ -1246,10 +1362,20 @@ Dbtup::disk_restart_undo(Signal* signal, Uint64 lsn,
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
for(Uint32 i = 0; i<MAX_FRAG_PER_NODE; i++)
if (tabPtr.p->fragrec[i] != RNIL)
disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i]);
disk_restart_undo_lcp(tabPtr.i, tabPtr.p->fragid[i],
Fragrecord::UC_CREATE);
disk_restart_undo_next(signal);
return;
}
case File_formats::Undofile::UNDO_TUP_DROP:
jam();
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
jam();
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
jam();
disk_restart_undo_next(signal);
return;
case File_formats::Undofile::UNDO_END:
f_undo_done = true;
return;
......@@ -1283,7 +1409,7 @@ Dbtup::disk_restart_undo_next(Signal* signal)
}
void
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId)
Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId, Uint32 flag)
{
Ptr<Tablerec> tabPtr;
tabPtr.i= tableId;
......@@ -1295,7 +1421,7 @@ Dbtup::disk_restart_undo_lcp(Uint32 tableId, Uint32 fragId)
getFragmentrec(fragPtr, fragId, tabPtr.p);
if (!fragPtr.isNull())
{
fragPtr.p->m_undo_complete = true;
fragPtr.p->m_undo_complete |= flag;
}
}
}
......@@ -1502,6 +1628,12 @@ Dbtup::disk_restart_alloc_extent(Uint32 tableId, Uint32 fragId,
if (tabPtr.p->tableStatus == DEFINED)
{
getFragmentrec(fragPtr, fragId, tabPtr.p);
if (fragPtr.p->m_undo_complete & Fragrecord::UC_CREATE)
{
jam();
return -1;
}
if (!fragPtr.isNull())
{
Disk_alloc_info& alloc= fragPtr.p->m_disk_alloc_info;
......
......@@ -3129,7 +3129,7 @@ Dbtup::nr_delete(Signal* signal, Uint32 senderData,
disk_page_set_dirty(disk_page);
preq.m_callback.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_logbuffer_callback);
safe_cast(&Dbtup::nr_delete_log_buffer_callback);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
res= lgman.get_log_buffer(signal, sz, &preq.m_callback);
switch(res){
......@@ -3182,7 +3182,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
Callback cb;
cb.m_callbackData = userpointer;
cb.m_callbackFunction =
safe_cast(&Dbtup::nr_delete_logbuffer_callback);
safe_cast(&Dbtup::nr_delete_log_buffer_callback);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
......@@ -3202,7 +3202,7 @@ Dbtup::nr_delete_page_callback(Signal* signal,
}
void
Dbtup::nr_delete_logbuffer_callback(Signal* signal,
Dbtup::nr_delete_log_buffer_callback(Signal* signal,
Uint32 userpointer,
Uint32 unused)
{
......
......@@ -164,7 +164,7 @@ void Dbtup::execCONTINUEB(Signal* signal)
break;
case ZREL_FRAG:
ljam();
releaseFragment(signal, dataPtr);
releaseFragment(signal, dataPtr, signal->theData[2]);
break;
case ZREPORT_MEMORY_USAGE:{
ljam();
......@@ -223,7 +223,7 @@ void Dbtup::execCONTINUEB(Signal* signal)
fragPtr.i= signal->theData[2];
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
drop_fragment_free_exent(signal, tabPtr, fragPtr, signal->theData[3]);
drop_fragment_free_extent(signal, tabPtr, fragPtr, signal->theData[3]);
return;
}
case ZUNMAP_PAGES:
......
......@@ -309,6 +309,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
if(lastAttr)
{
jam();
/**
* Init Disk_alloc_info
*/
......@@ -320,6 +321,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire(tsman.get_tablespace_info(&rep) == 0);
regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
}
else
{
jam();
regFragPtr.p->m_logfile_group_id = RNIL;
}
new (&regFragPtr.p->m_disk_alloc_info)
Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size);
releaseFragoperrec(fragOperPtr);
......@@ -565,6 +571,11 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire(tsman.get_tablespace_info(&rep) == 0);
regFragPtr.p->m_logfile_group_id= rep.tablespace.logfile_group_id;
}
else
{
jam();
regFragPtr.p->m_logfile_group_id = RNIL;
}
new (&regFragPtr.p->m_disk_alloc_info)
Disk_alloc_info(regTabPtr.p, rep.tablespace.extent_size);
......@@ -597,7 +608,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
}
execute(signal, cb, 0);
execute(signal, cb, regFragPtr.p->m_logfile_group_id);
return;
}
}
......@@ -874,7 +885,8 @@ Dbtup::execDROP_TAB_REQ(Signal* signal)
signal->theData[0]= ZREL_FRAG;
signal->theData[1]= tabPtr.i;
sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
signal->theData[2]= RNIL;
sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
}
void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
......@@ -902,7 +914,8 @@ void Dbtup::releaseTabDescr(Tablerec* const regTabPtr)
}
}
void Dbtup::releaseFragment(Signal* signal, Uint32 tableId)
void Dbtup::releaseFragment(Signal* signal, Uint32 tableId,
Uint32 logfile_group_id)
{
TablerecPtr tabPtr;
tabPtr.i= tableId;
......@@ -930,15 +943,34 @@ void Dbtup::releaseFragment(Signal* signal, Uint32 tableId)
return;
}
DropTabConf * const dropConf= (DropTabConf *)signal->getDataPtrSend();
dropConf->senderRef= reference();
dropConf->senderData= tabPtr.p->m_dropTable.tabUserPtr;
dropConf->tableId= tabPtr.i;
sendSignal(tabPtr.p->m_dropTable.tabUserRef, GSN_DROP_TAB_CONF,
signal, DropTabConf::SignalLength, JBB);
#if NOT_YET_UNDO_DROP_TABLE
#error "This code is complete, but I prefer not to enable it until I need it"
if (logfile_group_id != RNIL)
{
Callback cb;
cb.m_callbackData= tabPtr.i;
cb.m_callbackFunction =
safe_cast(&Dbtup::drop_table_log_buffer_callback);
Uint32 sz= sizeof(Disk_undo::Drop) >> 2;
int r0 = c_lgman->alloc_log_space(logfile_group_id, sz);
releaseTabDescr(tabPtr.p);
initTab(tabPtr.p);
Logfile_client lgman(this, c_lgman, logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
case 0:
ljam();
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
default:
execute(signal, cb, logfile_group_id);
return;
}
}
#endif
drop_table_logsync_callback(signal, tabPtr.i, RNIL);
}
void
......@@ -965,7 +997,7 @@ Dbtup::drop_fragment_unmap_pages(Signal *signal,
alloc_info.m_curr_extent_info_ptr_i= RNIL;
}
drop_fragment_free_exent(signal, tabPtr, fragPtr, 0);
drop_fragment_free_extent(signal, tabPtr, fragPtr, 0);
return;
}
......@@ -998,7 +1030,7 @@ Dbtup::drop_fragment_unmap_pages(Signal *signal,
}
return;
}
drop_fragment_free_exent(signal, tabPtr, fragPtr, 0);
drop_fragment_free_extent(signal, tabPtr, fragPtr, 0);
}
void
......@@ -1031,7 +1063,7 @@ Dbtup::drop_fragment_unmap_page_callback(Signal* signal,
}
void
Dbtup::drop_fragment_free_exent(Signal *signal,
Dbtup::drop_fragment_free_extent(Signal *signal,
TablerecPtr tabPtr,
FragrecordPtr fragPtr,
Uint32 pos)
......@@ -1040,6 +1072,125 @@ Dbtup::drop_fragment_free_exent(Signal *signal,
{
Disk_alloc_info& alloc_info= fragPtr.p->m_disk_alloc_info;
for(; pos<EXTENT_SEARCH_MATRIX_SIZE; pos++)
{
if(!alloc_info.m_free_extents[pos].isEmpty())
{
jam();
Callback cb;
cb.m_callbackData= fragPtr.i;
cb.m_callbackFunction =
safe_cast(&Dbtup::drop_fragment_free_extent_log_buffer_callback);
#if NOT_YET_UNDO_FREE_EXTENT
Uint32 sz= sizeof(Disk_undo::FreeExtent) >> 2;
int r0 = c_lgman->alloc_log_space(fragPtr.p->m_logfile_group_id, sz);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
int res= lgman.get_log_buffer(signal, sz, &cb);
switch(res){
case 0:
ljam();
return;
case -1:
ndbrequire("NOT YET IMPLEMENTED" == 0);
break;
default:
execute(signal, cb, fragPtr.p->m_logfile_group_id);
return;
}
#else
execute(signal, cb, fragPtr.p->m_logfile_group_id);
return;
#endif
}
}
ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
for(pos= 0; pos<MAX_FREE_LIST; pos++)
{
ndbrequire(alloc_info.m_page_requests[pos].isEmpty());
LocalDLList<Page> list(* cheat_pool, alloc_info.m_dirty_pages[pos]);
list.remove();
}
}
signal->theData[0] = ZFREE_VAR_PAGES;
signal->theData[1] = tabPtr.i;
signal->theData[2] = fragPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
}
void
Dbtup::drop_table_log_buffer_callback(Signal* signal, Uint32 tablePtrI,
Uint32 logfile_group_id)
{
TablerecPtr tabPtr;
tabPtr.i = tablePtrI;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
ndbrequire(tabPtr.p->m_no_of_disk_attributes);
Disk_undo::Drop drop;
drop.m_table = tabPtr.i;
drop.m_type_length =
(Disk_undo::UNDO_DROP << 16) | (sizeof(drop) >> 2);
Logfile_client lgman(this, c_lgman, logfile_group_id);
Logfile_client::Change c[1] = {{ &drop, sizeof(drop) >> 2 } };
Uint64 lsn = lgman.add_entry(c, 1);
Logfile_client::Request req;
req.m_callback.m_callbackData= tablePtrI;
req.m_callback.m_callbackFunction =
safe_cast(&Dbtup::drop_table_logsync_callback);
int ret = lgman.sync_lsn(signal, lsn, &req, 0);
switch(ret){
case 0:
return;
default:
ndbout_c("ret: %d", ret);
ndbrequire(false);
}
}
void
Dbtup::drop_table_logsync_callback(Signal* signal,
Uint32 tabPtrI,
Uint32 logfile_group_id)
{
TablerecPtr tabPtr;
tabPtr.i = tabPtrI;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
DropTabConf * const dropConf= (DropTabConf *)signal->getDataPtrSend();
dropConf->senderRef= reference();
dropConf->senderData= tabPtr.p->m_dropTable.tabUserPtr;
dropConf->tableId= tabPtr.i;
sendSignal(tabPtr.p->m_dropTable.tabUserRef, GSN_DROP_TAB_CONF,
signal, DropTabConf::SignalLength, JBB);
releaseTabDescr(tabPtr.p);
initTab(tabPtr.p);
}
void
Dbtup::drop_fragment_free_extent_log_buffer_callback(Signal* signal,
Uint32 fragPtrI,
Uint32 unused)
{
FragrecordPtr fragPtr;
fragPtr.i = fragPtrI;
ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
TablerecPtr tabPtr;
tabPtr.i = fragPtr.p->fragTableId;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
ndbrequire(tabPtr.p->m_no_of_disk_attributes);
Disk_alloc_info& alloc_info= fragPtr.p->m_disk_alloc_info;
for(Uint32 pos = 0; pos<EXTENT_SEARCH_MATRIX_SIZE; pos++)
{
if(!alloc_info.m_free_extents[pos].isEmpty())
{
......@@ -1049,11 +1200,29 @@ Dbtup::drop_fragment_free_exent(Signal *signal,
Ptr<Extent_info> ext_ptr;
list.first(ext_ptr);
#if NOT_YET_UNDO_FREE_EXTENT
#error "This code is complete"
#error "but not needed until we do dealloc of empty extents"
Disk_undo::FreeExtent free;
free.m_table = tabPtr.i;
free.m_fragment = fragPtr.p->fragmentId;
free.m_file_no = ext_ptr.p->m_key.m_file_no;
free.m_page_no = ext_ptr.p->m_key.m_page_no;
free.m_type_length =
(Disk_undo::UNDO_FREE_EXTENT << 16) | (sizeof(free) >> 2);
Logfile_client lgman(this, c_lgman, fragPtr.p->m_logfile_group_id);
Logfile_client::Change c[1] = {{ &free, sizeof(free) >> 2 } };
Uint64 lsn = lgman.add_entry(c, 1);
#else
Uint64 lsn = 0;
#endif
Tablespace_client tsman(signal, c_tsman, tabPtr.i,
fragPtr.p->fragmentId,
fragPtr.p->m_tablespace_id);
tsman.free_extent(&ext_ptr.p->m_key);
tsman.free_extent(&ext_ptr.p->m_key, lsn);
c_extent_hash.remove(ext_ptr);
list.release(ext_ptr);
......@@ -1065,20 +1234,7 @@ Dbtup::drop_fragment_free_exent(Signal *signal,
return;
}
}
ArrayPool<Page> *cheat_pool= (ArrayPool<Page>*)&m_global_page_pool;
for(pos= 0; pos<MAX_FREE_LIST; pos++)
{
ndbrequire(alloc_info.m_page_requests[pos].isEmpty());
LocalDLList<Page> list(* cheat_pool, alloc_info.m_dirty_pages[pos]);
list.remove();
}
}
signal->theData[0] = ZFREE_VAR_PAGES;
signal->theData[1] = tabPtr.i;
signal->theData[2] = fragPtr.i;
sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB);
ndbrequire(false);
}
void
......@@ -1112,7 +1268,7 @@ Dbtup::drop_fragment_free_var_pages(Signal* signal)
sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
return;
}
Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id ;
releaseFragPages(fragPtr.p);
Uint32 i;
for(i= 0; i<MAX_FRAG_PER_NODE; i++)
......@@ -1126,7 +1282,8 @@ Dbtup::drop_fragment_free_var_pages(Signal* signal)
signal->theData[0]= ZREL_FRAG;
signal->theData[1]= tabPtr.i;
sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
signal->theData[2]= logfile_group_id;
sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
return;
}
......
......@@ -154,6 +154,9 @@ struct File_formats
,UNDO_TUP_UPDATE = 4
,UNDO_TUP_FREE = 5
,UNDO_TUP_CREATE = 6
,UNDO_TUP_DROP = 7
,UNDO_TUP_ALLOC_EXTENT = 8
,UNDO_TUP_FREE_EXTENT = 9
,UNDO_END = 0x7FFF
,UNDO_NEXT_LSN = 0x8000
......
......@@ -1160,13 +1160,14 @@ Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr)
bool removed= false;
Ptr<Log_waiter> waiter;
list.first(waiter);
Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn)
{
removed= true;
Uint32 block = waiter.p->m_block;
SimulatedBlock* b = globalData.getBlock(block);
b->execute(signal, waiter.p->m_callback, 0);
b->execute(signal, waiter.p->m_callback, logfile_group_id);
list.releaseFirst(waiter);
}
......@@ -1522,12 +1523,13 @@ Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr)
bool removed= false;
Ptr<Log_waiter> waiter;
list.first(waiter);
Uint32 logfile_group_id = ptr.p->m_logfile_group_id;
if(waiter.p->m_size + 2*File_formats::UNDO_PAGE_WORDS < free_buffer)
{
removed= true;
Uint32 block = waiter.p->m_block;
SimulatedBlock* b = globalData.getBlock(block);
b->execute(signal, waiter.p->m_callback, 0);
b->execute(signal, waiter.p->m_callback, logfile_group_id);
list.releaseFirst(waiter);
}
......@@ -2699,6 +2701,9 @@ Lgman::execute_undo_record(Signal* signal)
case File_formats::Undofile::UNDO_TUP_UPDATE:
case File_formats::Undofile::UNDO_TUP_FREE:
case File_formats::Undofile::UNDO_TUP_CREATE:
case File_formats::Undofile::UNDO_TUP_DROP:
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
tup->disk_restart_undo(signal, lsn, mask, ptr - len + 1, len);
return;
default:
......
......@@ -304,6 +304,8 @@ print_undo_page(int count, void* ptr, Uint32 sz){
case File_formats::Undofile::UNDO_LCP:
printf("[ %lld LCP %d tab: %d frag: %d ]", lsn,
src[0], src[1] >> 16, src[1] & 0xFFFF);
if(g_verbosity <= 3)
printf("\n");
break;
case File_formats::Undofile::UNDO_TUP_ALLOC:
if(g_verbosity > 3)
......@@ -340,6 +342,48 @@ print_undo_page(int count, void* ptr, Uint32 sz){
req->m_gci);
}
break;
case File_formats::Undofile::UNDO_TUP_CREATE:
{
Dbtup::Disk_undo::Create *req = (Dbtup::Disk_undo::Create*)src;
printf("[ %lld Create %d ]", lsn, req->m_table);
if(g_verbosity <= 3)
printf("\n");
break;
}
case File_formats::Undofile::UNDO_TUP_DROP:
{
Dbtup::Disk_undo::Drop *req = (Dbtup::Disk_undo::Drop*)src;
printf("[ %lld Drop %d ]", lsn, req->m_table);
if(g_verbosity <= 3)
printf("\n");
break;
}
case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT:
{
Dbtup::Disk_undo::AllocExtent *req = (Dbtup::Disk_undo::AllocExtent*)src;
printf("[ %lld AllocExtent tab: %d frag: %d file: %d page: %d ]",
lsn,
req->m_table,
req->m_fragment,
req->m_file_no,
req->m_page_no);
if(g_verbosity <= 3)
printf("\n");
break;
}
case File_formats::Undofile::UNDO_TUP_FREE_EXTENT:
{
Dbtup::Disk_undo::FreeExtent *req = (Dbtup::Disk_undo::FreeExtent*)src;
printf("[ %lld FreeExtent tab: %d frag: %d file: %d page: %d ]",
lsn,
req->m_table,
req->m_fragment,
req->m_file_no,
req->m_page_no);
if(g_verbosity <= 3)
printf("\n");
break;
}
default:
ndbout_c("[ Unknown type %d len: %d, pos: %d ]", type, len, pos);
if(!(len && type))
......
This diff is collapsed.
......@@ -63,6 +63,7 @@ protected:
void execALLOC_PAGE_REQ(Signal* signal);
void execLCP_FRAG_ORD(Signal*);
void execEND_LCP_REQ(Signal*);
void end_lcp(Signal*, Uint32 tablespace, Uint32 list, Uint32 file);
......@@ -108,6 +109,7 @@ public:
Uint32 m_offset_data_pages; // 1(zero) + extent header pages
Uint32 m_data_pages;
Uint32 m_used_extent_cnt;
Uint32 m_extent_headers_per_extent_page;
} m_online;
struct {
Uint32 m_senderData;
......@@ -196,6 +198,7 @@ private:
Datafile_pool m_file_pool;
Tablespace_pool m_tablespace_pool;
bool m_lcp_ongoing;
Datafile_hash m_file_hash;
Tablespace_list m_tablespace_list;
Tablespace_hash m_tablespace_hash;
......@@ -226,15 +229,52 @@ private:
void release_extent_pages(Signal* signal, Ptr<Datafile> ptr);
void release_extent_pages_callback(Signal*, Uint32, Uint32);
struct req
{
Uint32 m_extent_pages;
Uint32 m_extent_size;
Uint32 m_extent_no; // on extent page
Uint32 m_extent_page_no;
};
struct req lookup_extent(Uint32 page_no, const Datafile*) const;
Uint32 calc_page_no_in_extent(Uint32 page_no, const struct req* val) const;
};
inline
Tsman::req
Tsman::lookup_extent(Uint32 page_no, const Datafile * filePtrP) const
{
struct req val;
val.m_extent_size = filePtrP->m_extent_size;
val.m_extent_pages = filePtrP->m_online.m_offset_data_pages;
Uint32 per_page = filePtrP->m_online.m_extent_headers_per_extent_page;
Uint32 extent =
(page_no - val.m_extent_pages) / val.m_extent_size + per_page;
val.m_extent_page_no = extent / per_page;
val.m_extent_no = extent % per_page;
return val;
}
inline
Uint32
Tsman::calc_page_no_in_extent(Uint32 page_no, const Tsman::req* val) const
{
return (page_no - val->m_extent_pages) % val->m_extent_size;
}
class Tablespace_client
{
public:
Tsman * m_tsman;
Signal* m_signal;
Uint32 m_table_id;
Uint32 m_fragment_id;
Uint32 m_tablespace_id;
public:
Tablespace_client(Signal* signal, Tsman* tsman,
Uint32 table, Uint32 fragment, Uint32 tablespaceId) {
......@@ -245,6 +285,8 @@ public:
m_tablespace_id= tablespaceId;
}
Tablespace_client(Signal* signal, Tsman* tsman, Local_key* key);
/**
* Return >0 if success, no of pages in extent, sets key
* <0 if failure, -error code
......@@ -274,7 +316,7 @@ public:
/**
* Free extent
*/
int free_extent(Local_key* key);
int free_extent(Local_key* key, Uint64 lsn);
/**
* Update page free bits
......@@ -307,6 +349,11 @@ public:
* <0 - on error
*/
int get_tablespace_info(CreateFilegroupImplReq* rep);
/**
* Update lsn of page corresponing to key
*/
int update_lsn(Local_key* key, Uint64 lsn);
};
#include <signaldata/Extent.hpp>
......@@ -351,12 +398,14 @@ Tablespace_client::alloc_page_from_extent(Local_key* key, Uint32 bits)
inline
int
Tablespace_client::free_extent(Local_key* key)
Tablespace_client::free_extent(Local_key* key, Uint64 lsn)
{
FreeExtentReq* req = (FreeExtentReq*)m_signal->theData;
req->request.key = *key;
req->request.table_id = m_table_id;
req->request.tablespace_id = m_tablespace_id;
req->request.lsn_hi = (Uint32)(lsn >> 32);
req->request.lsn_lo = (Uint32)(lsn & 0xFFFFFFFF);
m_tsman->execFREE_EXTENT_REQ(m_signal);
if(req->reply.errorCode == 0){
......@@ -407,5 +456,4 @@ Tablespace_client::restart_undo_page_free_bits(Local_key* key,
page_lsn);
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment