/* Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "lgman.hpp" #include "diskpage.hpp" #include <signaldata/FsRef.hpp> #include <signaldata/FsConf.hpp> #include <signaldata/FsOpenReq.hpp> #include <signaldata/FsCloseReq.hpp> #include <signaldata/CreateFilegroupImpl.hpp> #include <signaldata/DropFilegroupImpl.hpp> #include <signaldata/FsReadWriteReq.hpp> #include <signaldata/LCP.hpp> #include <signaldata/SumaImpl.hpp> #include <signaldata/LgmanContinueB.hpp> #include <signaldata/GetTabInfo.hpp> #include "ndbfs/Ndbfs.hpp" #include "dbtup/Dbtup.hpp" #include <EventLogger.hpp> extern EventLogger g_eventLogger; #include <record_types.hpp> /** * ---<a>-----<b>-----<c>-----<d>---> (time) * * <a> = start of lcp 1 * <b> = stop of lcp 1 * <c> = start of lcp 2 * <d> = stop of lcp 2 * * If ndb crashes before <d> * the entire undo log from crash point until <a> has to be applied * * at <d> the undo log can be cut til <c> */ #define DEBUG_UNDO_EXECUTION 0 #define DEBUG_SEARCH_LOG_HEAD 0 Lgman::Lgman(Block_context & ctx) : SimulatedBlock(LGMAN, ctx), m_logfile_group_list(m_logfile_group_pool), m_logfile_group_hash(m_logfile_group_pool) { BLOCK_CONSTRUCTOR(Lgman); // Add received signals addRecSignal(GSN_STTOR, &Lgman::execSTTOR); addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ); addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD); addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB); addRecSignal(GSN_CREATE_FILE_REQ, &Lgman::execCREATE_FILE_REQ); addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Lgman::execCREATE_FILEGROUP_REQ); addRecSignal(GSN_DROP_FILE_REQ, &Lgman::execDROP_FILE_REQ); addRecSignal(GSN_DROP_FILEGROUP_REQ, &Lgman::execDROP_FILEGROUP_REQ); addRecSignal(GSN_FSWRITEREQ, &Lgman::execFSWRITEREQ); addRecSignal(GSN_FSWRITEREF, &Lgman::execFSWRITEREF, true); addRecSignal(GSN_FSWRITECONF, &Lgman::execFSWRITECONF); addRecSignal(GSN_FSOPENREF, &Lgman::execFSOPENREF, true); addRecSignal(GSN_FSOPENCONF, &Lgman::execFSOPENCONF); addRecSignal(GSN_FSCLOSECONF, &Lgman::execFSCLOSECONF); addRecSignal(GSN_FSREADREF, &Lgman::execFSREADREF, true); addRecSignal(GSN_FSREADCONF, &Lgman::execFSREADCONF); addRecSignal(GSN_LCP_FRAG_ORD, &Lgman::execLCP_FRAG_ORD); addRecSignal(GSN_END_LCP_REQ, &Lgman::execEND_LCP_REQ); addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Lgman::execSUB_GCP_COMPLETE_REP); addRecSignal(GSN_START_RECREQ, &Lgman::execSTART_RECREQ); addRecSignal(GSN_END_LCP_CONF, &Lgman::execEND_LCP_CONF); addRecSignal(GSN_GET_TABINFOREQ, &Lgman::execGET_TABINFOREQ); m_last_lsn = 1; m_logfile_group_hash.setSize(10); } Lgman::~Lgman() { } BLOCK_FUNCTIONS(Lgman) void Lgman::execREAD_CONFIG_REQ(Signal* signal) { jamEntry(); const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr(); Uint32 ref = req->senderRef; Uint32 senderData = req->senderData; const ndb_mgm_configuration_iterator * p = m_ctx.m_config.getOwnConfigIterator(); ndbrequire(p != 0); Pool_context pc; pc.m_block = this; m_log_waiter_pool.wo_pool_init(RT_LGMAN_LOG_WAITER, pc); m_file_pool.init(RT_LGMAN_FILE, pc); m_logfile_group_pool.init(RT_LGMAN_FILEGROUP, pc); m_data_buffer_pool.setSize(10); ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = senderData; sendSignal(ref, GSN_READ_CONFIG_CONF, signal, ReadConfigConf::SignalLength, JBB); } void Lgman::execSTTOR(Signal* signal) { jamEntry(); sendSTTORRY(signal); return; }//Lgman::execNDB_STTOR() void Lgman::sendSTTORRY(Signal* signal) { signal->theData[0] = 0; signal->theData[3] = 1; signal->theData[4] = 2; signal->theData[5] = 3; signal->theData[6] = 4; signal->theData[7] = 5; signal->theData[8] = 6; signal->theData[9] = 255; // No more start phases from missra sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB); } void Lgman::execCONTINUEB(Signal* signal){ jamEntry(); Uint32 type= signal->theData[0]; Uint32 ptrI = signal->theData[1]; switch(type){ case LgmanContinueB::FILTER_LOG: jam(); break; case LgmanContinueB::CUT_LOG_TAIL: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); cut_log_tail(signal, ptr); return; } case LgmanContinueB::FLUSH_LOG: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); flush_log(signal, ptr, signal->theData[2]); return; } case LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); process_log_buffer_waiters(signal, ptr); return; } case LgmanContinueB::FIND_LOG_HEAD: jam(); Ptr<Logfile_group> ptr; if(ptrI != RNIL) { m_logfile_group_pool.getPtr(ptr, ptrI); find_log_head(signal, ptr); } else { init_run_undo_log(signal); } return; case LgmanContinueB::EXECUTE_UNDO_RECORD: jam(); execute_undo_record(signal); return; case LgmanContinueB::STOP_UNDO_LOG: jam(); stop_run_undo_log(signal); return; case LgmanContinueB::READ_UNDO_LOG: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); read_undo_log(signal, ptr); return; } case LgmanContinueB::PROCESS_LOG_SYNC_WAITERS: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); process_log_sync_waiters(signal, ptr); return; } case LgmanContinueB::FORCE_LOG_SYNC: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); force_log_sync(signal, ptr, signal->theData[2], signal->theData[3]); return; } case LgmanContinueB::DROP_FILEGROUP: { jam(); Ptr<Logfile_group> ptr; m_logfile_group_pool.getPtr(ptr, ptrI); if (ptr.p->m_state & Logfile_group::LG_THREAD_MASK) { jam(); sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, signal->length()); return; } Uint32 ref = signal->theData[2]; Uint32 data = signal->theData[3]; drop_filegroup_drop_files(signal, ptr, ref, data); return; } } } void Lgman::execDUMP_STATE_ORD(Signal* signal){ jamEntry(); if(signal->theData[0] == 12001) { Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); while(!ptr.isNull()) { infoEvent("lfg %d state: %x fs: %d lsn " "[ last: %lld s(req): %lld s:ed: %lld lcp: %lld ] waiters: %d %d", ptr.p->m_logfile_group_id, ptr.p->m_state, ptr.p->m_outstanding_fs, ptr.p->m_last_lsn, ptr.p->m_last_sync_req_lsn, ptr.p->m_last_synced_lsn, ptr.p->m_last_lcp_lsn, !ptr.p->m_log_buffer_waiters.isEmpty(), !ptr.p->m_log_sync_waiters.isEmpty()); if (!ptr.p->m_log_buffer_waiters.isEmpty()) { Ptr<Log_waiter> waiter; Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters); list.first(waiter); infoEvent(" free_buffer_words: %d head(waiters).sz: %d %d", ptr.p->m_free_buffer_words, waiter.p->m_size, 2*File_formats::UNDO_PAGE_WORDS); } if (!ptr.p->m_log_sync_waiters.isEmpty()) { Ptr<Log_waiter> waiter; Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters); list.first(waiter); infoEvent(" m_last_synced_lsn: %lld head(waiters %x).m_sync_lsn: %lld", ptr.p->m_last_synced_lsn, waiter.i, waiter.p->m_sync_lsn); while(!waiter.isNull()) { ndbout_c("ptr: %x %p lsn: %lld next: %x", waiter.i, waiter.p, waiter.p->m_sync_lsn, waiter.p->nextList); list.next(waiter); } } m_logfile_group_list.next(ptr); } } } void Lgman::execCREATE_FILEGROUP_REQ(Signal* signal){ jamEntry(); CreateFilegroupImplReq* req= (CreateFilegroupImplReq*)signal->getDataPtr(); Uint32 senderRef = req->senderRef; Uint32 senderData = req->senderData; Ptr<Logfile_group> ptr; CreateFilegroupImplRef::ErrorCode err = CreateFilegroupImplRef::NoError; do { if (m_logfile_group_hash.find(ptr, req->filegroup_id)) { jam(); err = CreateFilegroupImplRef::FilegroupAlreadyExists; break; } if (!m_logfile_group_list.isEmpty()) { jam(); err = CreateFilegroupImplRef::OneLogfileGroupLimit; break; } if (!m_logfile_group_pool.seize(ptr)) { jam(); err = CreateFilegroupImplRef::OutOfFilegroupRecords; break; } new (ptr.p) Logfile_group(req); if (!alloc_logbuffer_memory(ptr, req->logfile_group.buffer_size)) { jam(); err= CreateFilegroupImplRef::OutOfLogBufferMemory; m_logfile_group_pool.release(ptr); break; } m_logfile_group_hash.add(ptr); m_logfile_group_list.add(ptr); if (getNodeState().getNodeRestartInProgress() || getNodeState().getSystemRestartInProgress()) { ptr.p->m_state = Logfile_group::LG_STARTING; } CreateFilegroupImplConf* conf= (CreateFilegroupImplConf*)signal->getDataPtr(); conf->senderData = senderData; conf->senderRef = reference(); sendSignal(senderRef, GSN_CREATE_FILEGROUP_CONF, signal, CreateFilegroupImplConf::SignalLength, JBB); return; } while(0); CreateFilegroupImplRef* ref= (CreateFilegroupImplRef*)signal->getDataPtr(); ref->senderData = senderData; ref->senderRef = reference(); ref->errorCode = err; sendSignal(senderRef, GSN_CREATE_FILEGROUP_REF, signal, CreateFilegroupImplRef::SignalLength, JBB); } void Lgman::execDROP_FILEGROUP_REQ(Signal* signal) { jamEntry(); Uint32 errorCode = 0; DropFilegroupImplReq req = *(DropFilegroupImplReq*)signal->getDataPtr(); do { Ptr<Logfile_group> ptr; if (!m_logfile_group_hash.find(ptr, req.filegroup_id)) { errorCode = DropFilegroupImplRef::NoSuchFilegroup; break; } if (ptr.p->m_version != req.filegroup_version) { errorCode = DropFilegroupImplRef::InvalidFilegroupVersion; break; } switch(req.requestInfo){ case DropFilegroupImplReq::Prepare: break; case DropFilegroupImplReq::Commit: m_logfile_group_list.remove(ptr); ptr.p->m_state |= Logfile_group::LG_DROPPING; signal->theData[0] = LgmanContinueB::DROP_FILEGROUP; signal->theData[1] = ptr.i; signal->theData[2] = req.senderRef; signal->theData[3] = req.senderData; sendSignal(reference(), GSN_CONTINUEB, signal, 4, JBB); return; case DropFilegroupImplReq::Abort: break; default: ndbrequire(false); } } while(0); if (errorCode) { DropFilegroupImplRef* ref = (DropFilegroupImplRef*)signal->getDataPtrSend(); ref->senderRef = reference(); ref->senderData = req.senderData; ref->errorCode = errorCode; sendSignal(req.senderRef, GSN_DROP_FILEGROUP_REF, signal, DropFilegroupImplRef::SignalLength, JBB); } else { DropFilegroupImplConf* conf = (DropFilegroupImplConf*)signal->getDataPtrSend(); conf->senderRef = reference(); conf->senderData = req.senderData; sendSignal(req.senderRef, GSN_DROP_FILEGROUP_CONF, signal, DropFilegroupImplConf::SignalLength, JBB); } } void Lgman::drop_filegroup_drop_files(Signal* signal, Ptr<Logfile_group> ptr, Uint32 ref, Uint32 data) { jam(); ndbrequire(! (ptr.p->m_state & Logfile_group::LG_THREAD_MASK)); ndbrequire(ptr.p->m_outstanding_fs == 0); Local_undofile_list list(m_file_pool, ptr.p->m_files); Ptr<Undofile> file_ptr; if (list.first(file_ptr)) { jam(); ndbrequire(! (file_ptr.p->m_state & Undofile::FS_OUTSTANDING)); file_ptr.p->m_create.m_senderRef = ref; file_ptr.p->m_create.m_senderData = data; create_file_abort(signal, ptr, file_ptr); return; } Local_undofile_list metalist(m_file_pool, ptr.p->m_meta_files); if (metalist.first(file_ptr)) { jam(); metalist.remove(file_ptr); list.add(file_ptr); file_ptr.p->m_create.m_senderRef = ref; file_ptr.p->m_create.m_senderData = data; create_file_abort(signal, ptr, file_ptr); return; } free_logbuffer_memory(ptr); m_logfile_group_hash.release(ptr); DropFilegroupImplConf *conf = (DropFilegroupImplConf*)signal->getDataPtr(); conf->senderData = data; conf->senderRef = reference(); sendSignal(ref, GSN_DROP_FILEGROUP_CONF, signal, DropFilegroupImplConf::SignalLength, JBB); } void Lgman::execCREATE_FILE_REQ(Signal* signal) { jamEntry(); CreateFileImplReq* req= (CreateFileImplReq*)signal->getDataPtr(); Uint32 senderRef = req->senderRef; Uint32 senderData = req->senderData; Uint32 requestInfo = req->requestInfo; Ptr<Logfile_group> ptr; CreateFileImplRef::ErrorCode err = CreateFileImplRef::NoError; do { if (!m_logfile_group_hash.find(ptr, req->filegroup_id)) { jam(); err = CreateFileImplRef::InvalidFilegroup; break; } if (ptr.p->m_version != req->filegroup_version) { jam(); err = CreateFileImplRef::InvalidFilegroupVersion; break; } Ptr<Undofile> file_ptr; switch(requestInfo){ case CreateFileImplReq::Commit: { jam(); ndbrequire(find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id)); file_ptr.p->m_create.m_senderRef = req->senderRef; file_ptr.p->m_create.m_senderData = req->senderData; create_file_commit(signal, ptr, file_ptr); return; } case CreateFileImplReq::Abort: { Uint32 senderRef = req->senderRef; Uint32 senderData = req->senderData; if (find_file_by_id(file_ptr, ptr.p->m_meta_files, req->file_id)) { jam(); file_ptr.p->m_create.m_senderRef = senderRef; file_ptr.p->m_create.m_senderData = senderData; create_file_abort(signal, ptr, file_ptr); } else { CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr(); jam(); conf->senderData = senderData; conf->senderRef = reference(); sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal, CreateFileImplConf::SignalLength, JBB); } return; } default: // prepare break; } if (!m_file_pool.seize(file_ptr)) { jam(); err = CreateFileImplRef::OutOfFileRecords; break; } if(ERROR_INSERTED(15000) || (sizeof(void*) == 4 && req->file_size_hi & 0xFFFFFFFF)) { jam(); if(signal->getNoOfSections()) releaseSections(signal); CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr(); ref->senderData = senderData; ref->senderRef = reference(); ref->errorCode = CreateFileImplRef::FileSizeTooLarge; sendSignal(senderRef, GSN_CREATE_FILE_REF, signal, CreateFileImplRef::SignalLength, JBB); return; } new (file_ptr.p) Undofile(req, ptr.i); Local_undofile_list tmp(m_file_pool, ptr.p->m_meta_files); tmp.add(file_ptr); open_file(signal, file_ptr, req->requestInfo); return; } while(0); CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr(); ref->senderData = senderData; ref->senderRef = reference(); ref->errorCode = err; sendSignal(senderRef, GSN_CREATE_FILE_REF, signal, CreateFileImplRef::SignalLength, JBB); } void Lgman::open_file(Signal* signal, Ptr<Undofile> ptr, Uint32 requestInfo) { FsOpenReq* req = (FsOpenReq*)signal->getDataPtrSend(); req->userReference = reference(); req->userPointer = ptr.i; memset(req->fileNumber, 0, sizeof(req->fileNumber)); FsOpenReq::setVersion(req->fileNumber, 4); // Version 4 = specified filename req->fileFlags = 0; req->fileFlags |= FsOpenReq::OM_READWRITE; req->fileFlags |= FsOpenReq::OM_DIRECT; req->fileFlags |= FsOpenReq::OM_SYNC; switch(requestInfo){ case CreateFileImplReq::Create: req->fileFlags |= FsOpenReq::OM_CREATE_IF_NONE; req->fileFlags |= FsOpenReq::OM_INIT; ptr.p->m_state = Undofile::FS_CREATING; break; case CreateFileImplReq::CreateForce: req->fileFlags |= FsOpenReq::OM_CREATE; req->fileFlags |= FsOpenReq::OM_INIT; ptr.p->m_state = Undofile::FS_CREATING; break; case CreateFileImplReq::Open: req->fileFlags |= FsOpenReq::OM_CHECK_SIZE; ptr.p->m_state = Undofile::FS_OPENING; break; default: ndbrequire(false); } req->page_size = File_formats::NDB_PAGE_SIZE; Uint64 size = (Uint64)ptr.p->m_file_size * (Uint64)File_formats::NDB_PAGE_SIZE; req->file_size_hi = size >> 32; req->file_size_lo = size & 0xFFFFFFFF; // Forward filename sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBB); } void Lgman::execFSWRITEREQ(Signal* signal) { jamEntry(); Ptr<Undofile> ptr; Ptr<GlobalPage> page_ptr; FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr(); m_file_pool.getPtr(ptr, req->userPointer); m_global_page_pool.getPtr(page_ptr, req->data.pageData[0]); if (req->varIndex == 0) { jam(); File_formats::Undofile::Zero_page* page = (File_formats::Undofile::Zero_page*)page_ptr.p; page->m_page_header.init(File_formats::FT_Undofile, getOwnNodeId(), ndbGetOwnVersion(), time(0)); page->m_file_id = ptr.p->m_file_id; page->m_logfile_group_id = ptr.p->m_create.m_logfile_group_id; page->m_logfile_group_version = ptr.p->m_create.m_logfile_group_version; page->m_undo_pages = ptr.p->m_file_size - 1; // minus zero page } else { jam(); File_formats::Undofile::Undo_page* page = (File_formats::Undofile::Undo_page*)page_ptr.p; page->m_page_header.m_page_lsn_hi = 0; page->m_page_header.m_page_lsn_lo = 0; page->m_page_header.m_page_type = File_formats::PT_Undopage; page->m_words_used = 0; } } void Lgman::execFSOPENREF(Signal* signal) { jamEntry(); Ptr<Undofile> ptr; Ptr<Logfile_group> lg_ptr; FsRef* ref = (FsRef*)signal->getDataPtr(); Uint32 errCode = ref->errorCode; Uint32 osErrCode = ref->osErrorCode; m_file_pool.getPtr(ptr, ref->userPointer); m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i); { CreateFileImplRef* ref= (CreateFileImplRef*)signal->getDataPtr(); ref->senderData = ptr.p->m_create.m_senderData; ref->senderRef = reference(); ref->errorCode = CreateFileImplRef::FileError; ref->fsErrCode = errCode; ref->osErrCode = osErrCode; sendSignal(ptr.p->m_create.m_senderRef, GSN_CREATE_FILE_REF, signal, CreateFileImplRef::SignalLength, JBB); } Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files); meta.release(ptr); } #define HEAD 0 #define TAIL 1 void Lgman::execFSOPENCONF(Signal* signal) { jamEntry(); Ptr<Undofile> ptr; FsConf* conf = (FsConf*)signal->getDataPtr(); Uint32 fd = conf->filePointer; m_file_pool.getPtr(ptr, conf->userPointer); ptr.p->m_fd = fd; { Uint32 senderRef = ptr.p->m_create.m_senderRef; Uint32 senderData = ptr.p->m_create.m_senderData; CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr(); conf->senderData = senderData; conf->senderRef = reference(); sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal, CreateFileImplConf::SignalLength, JBB); } } bool Lgman::find_file_by_id(Ptr<Undofile>& ptr, Local_undofile_list::Head& head, Uint32 id) { Local_undofile_list list(m_file_pool, head); for(list.first(ptr); !ptr.isNull(); list.next(ptr)) if(ptr.p->m_file_id == id) return true; return false; } void Lgman::create_file_commit(Signal* signal, Ptr<Logfile_group> lg_ptr, Ptr<Undofile> ptr) { Uint32 senderRef = ptr.p->m_create.m_senderRef; Uint32 senderData = ptr.p->m_create.m_senderData; bool first= false; if(ptr.p->m_state == Undofile::FS_CREATING && (lg_ptr.p->m_state & Logfile_group::LG_ONLINE)) { jam(); Local_undofile_list free(m_file_pool, lg_ptr.p->m_files); Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files); first= free.isEmpty(); meta.remove(ptr); if(!first) { /** * Add log file next after current head */ Ptr<Undofile> curr; m_file_pool.getPtr(curr, lg_ptr.p->m_file_pos[HEAD].m_ptr_i); if(free.next(curr)) free.insert(ptr, curr); // inserts before (that's why the extra next) else free.add(ptr); ptr.p->m_state = Undofile::FS_ONLINE | Undofile::FS_EMPTY; } else { /** * First file isn't empty as it can be written to at any time */ free.add(ptr); ptr.p->m_state = Undofile::FS_ONLINE; lg_ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD; signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[1] = lg_ptr.i; signal->theData[2] = 0; sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB); } } else { ptr.p->m_state = Undofile::FS_SORTING; } ptr.p->m_online.m_lsn = 0; ptr.p->m_online.m_outstanding = 0; Uint64 add= ptr.p->m_file_size - 1; lg_ptr.p->m_free_file_words += add * File_formats::UNDO_PAGE_WORDS; if(first) { jam(); Buffer_idx tmp= { ptr.i, 0 }; lg_ptr.p->m_file_pos[HEAD] = lg_ptr.p->m_file_pos[TAIL] = tmp; /** * Init log tail pointer */ lg_ptr.p->m_tail_pos[0] = tmp; lg_ptr.p->m_tail_pos[1] = tmp; lg_ptr.p->m_tail_pos[2] = tmp; lg_ptr.p->m_next_reply_ptr_i = ptr.i; } validate_logfile_group(lg_ptr, "create_file_commit"); CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr(); conf->senderData = senderData; conf->senderRef = reference(); sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal, CreateFileImplConf::SignalLength, JBB); } void Lgman::create_file_abort(Signal* signal, Ptr<Logfile_group> lg_ptr, Ptr<Undofile> ptr) { if (ptr.p->m_fd == RNIL) { ((FsConf*)signal->getDataPtr())->userPointer = ptr.i; execFSCLOSECONF(signal); return; } FsCloseReq *req= (FsCloseReq*)signal->getDataPtrSend(); req->filePointer = ptr.p->m_fd; req->userReference = reference(); req->userPointer = ptr.i; req->fileFlag = 0; FsCloseReq::setRemoveFileFlag(req->fileFlag, true); sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBB); } void Lgman::execFSCLOSECONF(Signal* signal) { Ptr<Undofile> ptr; Ptr<Logfile_group> lg_ptr; Uint32 ptrI = ((FsConf*)signal->getDataPtr())->userPointer; m_file_pool.getPtr(ptr, ptrI); Uint32 senderRef = ptr.p->m_create.m_senderRef; Uint32 senderData = ptr.p->m_create.m_senderData; m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i); if (lg_ptr.p->m_state & Logfile_group::LG_DROPPING) { jam(); { Local_undofile_list list(m_file_pool, lg_ptr.p->m_files); list.release(ptr); } drop_filegroup_drop_files(signal, lg_ptr, senderRef, senderData); } else { jam(); Local_undofile_list list(m_file_pool, lg_ptr.p->m_meta_files); list.release(ptr); CreateFileImplConf* conf= (CreateFileImplConf*)signal->getDataPtr(); conf->senderData = senderData; conf->senderRef = reference(); sendSignal(senderRef, GSN_CREATE_FILE_CONF, signal, CreateFileImplConf::SignalLength, JBB); } } void Lgman::execDROP_FILE_REQ(Signal* signal) { jamEntry(); ndbrequire(false); } #define CONSUMER 0 #define PRODUCER 1 Lgman::Logfile_group::Logfile_group(const CreateFilegroupImplReq* req) { m_logfile_group_id = req->filegroup_id; m_version = req->filegroup_version; m_state = LG_ONLINE; m_outstanding_fs = 0; m_next_reply_ptr_i = RNIL; m_last_lsn = 0; m_last_synced_lsn = 0; m_last_sync_req_lsn = 0; m_max_sync_req_lsn = 0; m_last_read_lsn = 0; m_file_pos[0].m_ptr_i= m_file_pos[1].m_ptr_i = RNIL; m_free_file_words = 0; m_free_buffer_words = 0; m_pos[CONSUMER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx } m_pos[CONSUMER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used} m_pos[PRODUCER].m_current_page.m_ptr_i = RNIL;// { m_buffer_pages, idx } m_pos[PRODUCER].m_current_pos.m_ptr_i = RNIL; // { page ptr.i, m_words_used} m_tail_pos[2].m_ptr_i= RNIL; m_tail_pos[2].m_idx= ~0; m_tail_pos[0] = m_tail_pos[1] = m_tail_pos[2]; } bool Lgman::alloc_logbuffer_memory(Ptr<Logfile_group> ptr, Uint32 bytes) { Uint32 pages= (((bytes + 3) >> 2) + File_formats::NDB_PAGE_SIZE_WORDS - 1) / File_formats::NDB_PAGE_SIZE_WORDS; Uint32 requested= pages; { Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages); while(pages) { Uint32 ptrI; Uint32 cnt = pages > 64 ? 64 : pages; m_ctx.m_mm.alloc_pages(RG_DISK_OPERATIONS, &ptrI, &cnt, 1); if (cnt) { Buffer_idx range; range.m_ptr_i= ptrI; range.m_idx = cnt; ndbrequire(map.append((Uint32*)&range, 2)); pages -= range.m_idx; } else { break; } } } if(2*pages > requested) { // less than half allocated free_logbuffer_memory(ptr); return false; } if(pages != 0) { warningEvent("Allocated %d pages for log buffer space, logfile_group: %d" " , requested %d pages", (requested-pages), ptr.p->m_logfile_group_id, requested); } init_logbuffer_pointers(ptr); return true; } void Lgman::init_logbuffer_pointers(Ptr<Logfile_group> ptr) { Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages); Page_map::Iterator it; union { Uint32 tmp[2]; Buffer_idx range; }; map.first(it); tmp[0] = *it.data; ndbrequire(map.next(it)); tmp[1] = *it.data; ptr.p->m_pos[CONSUMER].m_current_page.m_ptr_i = 0; // Index in page map ptr.p->m_pos[CONSUMER].m_current_page.m_idx = range.m_idx - 1;// left range ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = 0; // Page pos ptr.p->m_pos[PRODUCER].m_current_page.m_ptr_i = 0; // Index in page map ptr.p->m_pos[PRODUCER].m_current_page.m_idx = range.m_idx - 1;// left range ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i = range.m_ptr_i; // Which page ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0; // Page pos Uint32 pages= range.m_idx; while(map.next(it)) { tmp[0] = *it.data; ndbrequire(map.next(it)); tmp[1] = *it.data; pages += range.m_idx; } ptr.p->m_free_buffer_words = pages * File_formats::UNDO_PAGE_WORDS; } Uint32 Lgman::compute_free_file_pages(Ptr<Logfile_group> ptr) { Buffer_idx head= ptr.p->m_file_pos[HEAD]; Buffer_idx tail= ptr.p->m_file_pos[TAIL]; Uint32 pages = 0; if (head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx) { pages += tail.m_idx - head.m_idx; } else { Ptr<Undofile> file; m_file_pool.getPtr(file, head.m_ptr_i); Local_undofile_list list(m_file_pool, ptr.p->m_files); do { pages += (file.p->m_file_size - head.m_idx - 1); if(!list.next(file)) list.first(file); head.m_idx = 0; } while(file.i != tail.m_ptr_i); pages += tail.m_idx - head.m_idx; } return pages; } void Lgman::free_logbuffer_memory(Ptr<Logfile_group> ptr) { union { Uint32 tmp[2]; Buffer_idx range; }; Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages); Page_map::Iterator it; map.first(it); while(!it.isNull()) { tmp[0] = *it.data; ndbrequire(map.next(it)); tmp[1] = *it.data; m_ctx.m_mm.release_pages(RG_DISK_OPERATIONS, range.m_ptr_i, range.m_idx); map.next(it); } map.release(); } Lgman::Undofile::Undofile(const struct CreateFileImplReq* req, Uint32 ptrI) { m_fd = RNIL; m_file_id = req->file_id; m_logfile_group_ptr_i= ptrI; Uint64 pages = req->file_size_hi; pages = (pages << 32) | req->file_size_lo; pages /= GLOBAL_PAGE_SIZE; m_file_size = pages; m_create.m_senderRef = req->senderRef; // During META m_create.m_senderData = req->senderData; // During META m_create.m_logfile_group_id = req->filegroup_id; } Logfile_client::Logfile_client(SimulatedBlock* block, Lgman* lgman, Uint32 logfile_group_id) { m_block= block->number(); m_lgman= lgman; m_logfile_group_id= logfile_group_id; } int Logfile_client::sync_lsn(Signal* signal, Uint64 lsn, Request* req, Uint32 flags) { Ptr<Lgman::Logfile_group> ptr; if(m_lgman->m_logfile_group_list.first(ptr)) { if(ptr.p->m_last_synced_lsn >= lsn) { return 1; } bool empty= false; Ptr<Lgman::Log_waiter> wait; { Lgman::Local_log_waiter_list list(m_lgman->m_log_waiter_pool, ptr.p->m_log_sync_waiters); empty= list.isEmpty(); if(!list.seize(wait)) return -1; wait.p->m_block= m_block; wait.p->m_sync_lsn= lsn; memcpy(&wait.p->m_callback, &req->m_callback, sizeof(SimulatedBlock::Callback)); ptr.p->m_max_sync_req_lsn = lsn > ptr.p->m_max_sync_req_lsn ? lsn : ptr.p->m_max_sync_req_lsn; } if(ptr.p->m_last_sync_req_lsn < lsn && ! (ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD)) { ptr.p->m_state |= Lgman::Logfile_group::LG_FORCE_SYNC_THREAD; signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC; signal->theData[1] = ptr.i; signal->theData[2] = lsn >> 32; signal->theData[3] = lsn & 0xFFFFFFFF; m_lgman->sendSignalWithDelay(m_lgman->reference(), GSN_CONTINUEB, signal, 10, 4); } return 0; } return -1; } void Lgman::force_log_sync(Signal* signal, Ptr<Logfile_group> ptr, Uint32 lsn_hi, Uint32 lsn_lo) { Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters); Uint64 force_lsn = lsn_hi; force_lsn <<= 32; force_lsn += lsn_lo; if(ptr.p->m_last_sync_req_lsn < force_lsn) { /** * Do force */ Buffer_idx pos= ptr.p->m_pos[PRODUCER].m_current_pos; GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i); Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx; if(pos.m_idx) // don't flush empty page... { Uint64 lsn= ptr.p->m_last_lsn - 1; File_formats::Undofile::Undo_page* undo= (File_formats::Undofile::Undo_page*)page; undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF; undo->m_page_header.m_page_lsn_hi = lsn >> 32; undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free; /** * Update free space with extra NOOP */ ndbrequire(ptr.p->m_free_file_words >= free); ndbrequire(ptr.p->m_free_buffer_words > free); ptr.p->m_free_file_words -= free; ptr.p->m_free_buffer_words -= free; validate_logfile_group(ptr, "force_log_sync"); next_page(ptr.p, PRODUCER); ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0; } } Uint64 max_req_lsn = ptr.p->m_max_sync_req_lsn; if(max_req_lsn > force_lsn && max_req_lsn > ptr.p->m_last_sync_req_lsn) { ndbrequire(ptr.p->m_state & Lgman::Logfile_group::LG_FORCE_SYNC_THREAD); signal->theData[0] = LgmanContinueB::FORCE_LOG_SYNC; signal->theData[1] = ptr.i; signal->theData[2] = max_req_lsn >> 32; signal->theData[3] = max_req_lsn & 0xFFFFFFFF; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 10, 4); } else { ptr.p->m_state &= ~(Uint32)Lgman::Logfile_group::LG_FORCE_SYNC_THREAD; } } void Lgman::process_log_sync_waiters(Signal* signal, Ptr<Logfile_group> ptr) { Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_sync_waiters); if(list.isEmpty()) { return; } bool removed= false; Ptr<Log_waiter> waiter; list.first(waiter); Uint32 logfile_group_id = ptr.p->m_logfile_group_id; if(waiter.p->m_sync_lsn <= ptr.p->m_last_synced_lsn) { removed= true; Uint32 block = waiter.p->m_block; SimulatedBlock* b = globalData.getBlock(block); b->execute(signal, waiter.p->m_callback, logfile_group_id); list.releaseFirst(waiter); } if(removed && !list.isEmpty()) { ptr.p->m_state |= Logfile_group::LG_SYNC_WAITERS_THREAD; signal->theData[0] = LgmanContinueB::PROCESS_LOG_SYNC_WAITERS; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } else { ptr.p->m_state &= ~(Uint32)Logfile_group::LG_SYNC_WAITERS_THREAD; } } Uint32* Lgman::get_log_buffer(Ptr<Logfile_group> ptr, Uint32 sz) { GlobalPage *page; page=m_shared_page_pool.getPtr(ptr.p->m_pos[PRODUCER].m_current_pos.m_ptr_i); Uint32 total_free= ptr.p->m_free_buffer_words; assert(total_free >= sz); Uint32 pos= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx; Uint32 free= File_formats::UNDO_PAGE_WORDS - pos; if(sz <= free) { next: // fits this page wo/ problem ndbrequire(total_free > sz); ptr.p->m_free_buffer_words = total_free - sz; ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = pos + sz; return ((File_formats::Undofile::Undo_page*)page)->m_data + pos; } /** * It didn't fit page...fill page with a NOOP log entry */ Uint64 lsn= ptr.p->m_last_lsn - 1; File_formats::Undofile::Undo_page* undo= (File_formats::Undofile::Undo_page*)page; undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF; undo->m_page_header.m_page_lsn_hi = lsn >> 32; undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free; /** * Update free space with extra NOOP */ ndbrequire(ptr.p->m_free_file_words >= free); ptr.p->m_free_file_words -= free; validate_logfile_group(ptr, "get_log_buffer"); pos= 0; assert(total_free >= free); total_free -= free; page= m_shared_page_pool.getPtr(next_page(ptr.p, PRODUCER)); goto next; } Uint32 Lgman::next_page(Logfile_group* ptrP, Uint32 i) { Uint32 page_ptr_i= ptrP->m_pos[i].m_current_pos.m_ptr_i; Uint32 left_in_range= ptrP->m_pos[i].m_current_page.m_idx; if(left_in_range > 0) { ptrP->m_pos[i].m_current_page.m_idx = left_in_range - 1; ptrP->m_pos[i].m_current_pos.m_ptr_i = page_ptr_i + 1; return page_ptr_i + 1; } else { Lgman::Page_map map(m_data_buffer_pool, ptrP->m_buffer_pages); Uint32 pos= (ptrP->m_pos[i].m_current_page.m_ptr_i + 2) % map.getSize(); Lgman::Page_map::Iterator it; map.position(it, pos); union { Uint32 tmp[2]; Lgman::Buffer_idx range; }; tmp[0] = *it.data; map.next(it); tmp[1] = *it.data; ptrP->m_pos[i].m_current_page.m_ptr_i = pos; // New index in map ptrP->m_pos[i].m_current_page.m_idx = range.m_idx - 1; // Free pages ptrP->m_pos[i].m_current_pos.m_ptr_i = range.m_ptr_i; // Current page // No need to set ptrP->m_current_pos.m_idx, that is set "in higher"-func return range.m_ptr_i; } } int Logfile_client::get_log_buffer(Signal* signal, Uint32 sz, SimulatedBlock::Callback* callback) { sz += 2; // lsn Lgman::Logfile_group key; key.m_logfile_group_id= m_logfile_group_id; Ptr<Lgman::Logfile_group> ptr; if(m_lgman->m_logfile_group_hash.find(ptr, key)) { if(ptr.p->m_free_buffer_words >= (sz + 2*File_formats::UNDO_PAGE_WORDS)&& ptr.p->m_log_buffer_waiters.isEmpty()) { return 1; } bool empty= false; { Ptr<Lgman::Log_waiter> wait; Lgman::Local_log_waiter_list list(m_lgman->m_log_waiter_pool, ptr.p->m_log_buffer_waiters); empty= list.isEmpty(); if(!list.seize(wait)) { return -1; } wait.p->m_size= sz; wait.p->m_block= m_block; memcpy(&wait.p->m_callback, callback,sizeof(SimulatedBlock::Callback)); } return 0; } return -1; } NdbOut& operator<<(NdbOut& out, const Lgman::Buffer_idx& pos) { out << "[ " << pos.m_ptr_i << " " << pos.m_idx << " ]"; return out; } NdbOut& operator<<(NdbOut& out, const Lgman::Logfile_group::Position& pos) { out << "[ (" << pos.m_current_page.m_ptr_i << " " << pos.m_current_page.m_idx << ") (" << pos.m_current_pos.m_ptr_i << " " << pos.m_current_pos.m_idx << ") ]"; return out; } void Lgman::flush_log(Signal* signal, Ptr<Logfile_group> ptr, Uint32 force) { Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER]; Logfile_group::Position producer= ptr.p->m_pos[PRODUCER]; jamEntry(); if(consumer.m_current_page == producer.m_current_page) { #if 0 if (force) { ndbout_c("force: %d ptr.p->m_file_pos[HEAD].m_ptr_i= %x", force, ptr.p->m_file_pos[HEAD].m_ptr_i); ndbout_c("consumer.m_current_page: %d %d producer.m_current_page: %d %d", consumer.m_current_page.m_ptr_i, consumer.m_current_page.m_idx, producer.m_current_page.m_ptr_i, producer.m_current_page.m_idx); } #endif if (! (ptr.p->m_state & Logfile_group::LG_DROPPING)) { jam(); if (ptr.p->m_log_buffer_waiters.isEmpty() || ptr.p->m_outstanding_fs) { force = 0; } if (force < 2) { signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[1] = ptr.i; signal->theData[2] = force + 1; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, force ? 10 : 100, 3); return; } else { Buffer_idx pos= producer.m_current_pos; GlobalPage *page = m_shared_page_pool.getPtr(pos.m_ptr_i); Uint32 free= File_formats::UNDO_PAGE_WORDS - pos.m_idx; ndbout_c("force flush %d %d", pos.m_idx, ptr.p->m_free_buffer_words); ndbrequire(pos.m_idx); // don't flush empty page... Uint64 lsn= ptr.p->m_last_lsn - 1; File_formats::Undofile::Undo_page* undo= (File_formats::Undofile::Undo_page*)page; undo->m_page_header.m_page_lsn_lo = lsn & 0xFFFFFFFF; undo->m_page_header.m_page_lsn_hi = lsn >> 32; undo->m_words_used= File_formats::UNDO_PAGE_WORDS - free; /** * Update free space with extra NOOP */ ndbrequire(ptr.p->m_free_file_words >= free); ndbrequire(ptr.p->m_free_buffer_words > free); ptr.p->m_free_file_words -= free; ptr.p->m_free_buffer_words -= free; validate_logfile_group(ptr, "force_log_flush"); next_page(ptr.p, PRODUCER); ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 0; producer = ptr.p->m_pos[PRODUCER]; // break through } } else { jam(); ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD; return; } } bool full= false; Uint32 tot= 0; while(!(consumer.m_current_page == producer.m_current_page) && !full) { validate_logfile_group(ptr, "before flush log"); Uint32 cnt; // pages written Uint32 page= consumer.m_current_pos.m_ptr_i; if(consumer.m_current_page.m_ptr_i == producer.m_current_page.m_ptr_i) { if(consumer.m_current_page.m_idx > producer.m_current_page.m_idx) { jam(); Uint32 tmp= consumer.m_current_page.m_idx - producer.m_current_page.m_idx; cnt= write_log_pages(signal, ptr, page, tmp); assert(cnt <= tmp); consumer.m_current_pos.m_ptr_i += cnt; consumer.m_current_page.m_idx -= cnt; full= (tmp > cnt); } else { // Only 1 chunk ndbrequire(ptr.p->m_buffer_pages.getSize() == 2); Uint32 tmp= consumer.m_current_page.m_idx + 1; cnt= write_log_pages(signal, ptr, page, tmp); assert(cnt <= tmp); if(cnt == tmp) { jam(); /** * Entire chunk is written * move to next */ ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0; next_page(ptr.p, CONSUMER); consumer = ptr.p->m_pos[CONSUMER]; } else { jam(); /** * Failed to write entire chunk... */ full= true; consumer.m_current_page.m_idx -= cnt; consumer.m_current_pos.m_ptr_i += cnt; } } } else { Uint32 tmp= consumer.m_current_page.m_idx + 1; cnt= write_log_pages(signal, ptr, page, tmp); assert(cnt <= tmp); if(cnt == tmp) { jam(); /** * Entire chunk is written * move to next */ ptr.p->m_pos[CONSUMER].m_current_page.m_idx= 0; next_page(ptr.p, CONSUMER); consumer = ptr.p->m_pos[CONSUMER]; } else { jam(); /** * Failed to write entire chunk... */ full= true; consumer.m_current_page.m_idx -= cnt; consumer.m_current_pos.m_ptr_i += cnt; } } tot += cnt; if(cnt) validate_logfile_group(ptr, " after flush_log"); } ptr.p->m_pos[CONSUMER]= consumer; if (! (ptr.p->m_state & Logfile_group::LG_DROPPING)) { signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[1] = ptr.i; signal->theData[2] = 0; sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB); } else { ptr.p->m_state &= ~(Uint32)Logfile_group::LG_FLUSH_THREAD; } } void Lgman::process_log_buffer_waiters(Signal* signal, Ptr<Logfile_group> ptr) { Uint32 free_buffer= ptr.p->m_free_buffer_words; Local_log_waiter_list list(m_log_waiter_pool, ptr.p->m_log_buffer_waiters); if(list.isEmpty()) { ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD; return; } bool removed= false; Ptr<Log_waiter> waiter; list.first(waiter); Uint32 logfile_group_id = ptr.p->m_logfile_group_id; if(waiter.p->m_size + 2*File_formats::UNDO_PAGE_WORDS < free_buffer) { removed= true; Uint32 block = waiter.p->m_block; SimulatedBlock* b = globalData.getBlock(block); b->execute(signal, waiter.p->m_callback, logfile_group_id); list.releaseFirst(waiter); } if(removed && !list.isEmpty()) { ptr.p->m_state |= Logfile_group::LG_WAITERS_THREAD; signal->theData[0] = LgmanContinueB::PROCESS_LOG_BUFFER_WAITERS; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } else { ptr.p->m_state &= ~(Uint32)Logfile_group::LG_WAITERS_THREAD; } } #define REALLY_SLOW_FS 0 Uint32 Lgman::write_log_pages(Signal* signal, Ptr<Logfile_group> ptr, Uint32 pageId, Uint32 in_pages) { assert(in_pages); Ptr<Undofile> filePtr; Buffer_idx head= ptr.p->m_file_pos[HEAD]; Buffer_idx tail= ptr.p->m_file_pos[TAIL]; m_file_pool.getPtr(filePtr, head.m_ptr_i); if(filePtr.p->m_online.m_outstanding > 0) { jam(); return 0; } Uint32 sz= filePtr.p->m_file_size - 1; // skip zero Uint32 max, pages= in_pages; if(!(head.m_ptr_i == tail.m_ptr_i && head.m_idx < tail.m_idx)) { max= sz - head.m_idx; } else { max= tail.m_idx - head.m_idx; } FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = filePtr.p->m_fd; req->userReference = reference(); req->userPointer = filePtr.i; req->varIndex = 1+head.m_idx; // skip zero page req->numberOfPages = pages; req->data.pageData[0] = pageId; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatSharedPage); if(max > pages) { jam(); max= pages; head.m_idx += max; ptr.p->m_file_pos[HEAD] = head; if (REALLY_SLOW_FS) sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS, FsReadWriteReq::FixedLength + 1); else sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; filePtr.p->m_online.m_outstanding = max; filePtr.p->m_state |= Undofile::FS_OUTSTANDING; File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*) m_shared_page_pool.getPtr(pageId + max - 1); Uint64 lsn = 0; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_lo; filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group } else { jam(); req->numberOfPages = max; FsReadWriteReq::setSyncFlag(req->operationFlag, 1); if (REALLY_SLOW_FS) sendSignalWithDelay(NDBFS_REF, GSN_FSWRITEREQ, signal, REALLY_SLOW_FS, FsReadWriteReq::FixedLength + 1); else sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; filePtr.p->m_online.m_outstanding = max; filePtr.p->m_state |= Undofile::FS_OUTSTANDING; File_formats::Undofile::Undo_page *page= (File_formats::Undofile::Undo_page*) m_shared_page_pool.getPtr(pageId + max - 1); Uint64 lsn = 0; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_lo; filePtr.p->m_online.m_lsn = lsn; // Store last writereq lsn on file ptr.p->m_last_sync_req_lsn = lsn; // And logfile_group Ptr<Undofile> next = filePtr; Local_undofile_list files(m_file_pool, ptr.p->m_files); if(!files.next(next)) { jam(); files.first(next); } ndbout_c("changing file from %d to %d", filePtr.i, next.i); filePtr.p->m_state |= Undofile::FS_MOVE_NEXT; next.p->m_state &= ~(Uint32)Undofile::FS_EMPTY; head.m_idx= 0; head.m_ptr_i= next.i; ptr.p->m_file_pos[HEAD] = head; if(max < pages) max += write_log_pages(signal, ptr, pageId + max, pages - max); } assert(max); return max; } void Lgman::execFSWRITEREF(Signal* signal) { jamEntry(); SimulatedBlock::execFSWRITEREF(signal); ndbrequire(false); } void Lgman::execFSWRITECONF(Signal* signal) { jamEntry(); FsConf * conf = (FsConf*)signal->getDataPtr(); Ptr<Undofile> ptr; m_file_pool.getPtr(ptr, conf->userPointer); ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING); ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING; Ptr<Logfile_group> lg_ptr; m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i); Uint32 cnt= lg_ptr.p->m_outstanding_fs; ndbrequire(cnt); if(lg_ptr.p->m_next_reply_ptr_i == ptr.i) { Uint32 tot= 0; Uint64 lsn = 0; { Local_undofile_list files(m_file_pool, lg_ptr.p->m_files); while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING)) { Uint32 state= ptr.p->m_state; Uint32 pages= ptr.p->m_online.m_outstanding; ndbrequire(pages); ptr.p->m_online.m_outstanding= 0; ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT; tot += pages; cnt--; lsn = ptr.p->m_online.m_lsn; if((state & Undofile::FS_MOVE_NEXT) && !files.next(ptr)) files.first(ptr); } } ndbassert(tot); lg_ptr.p->m_outstanding_fs = cnt; lg_ptr.p->m_free_buffer_words += (tot * File_formats::UNDO_PAGE_WORDS); lg_ptr.p->m_next_reply_ptr_i = ptr.i; lg_ptr.p->m_last_synced_lsn = lsn; if(! (lg_ptr.p->m_state & Logfile_group::LG_SYNC_WAITERS_THREAD)) { process_log_sync_waiters(signal, lg_ptr); } if(! (lg_ptr.p->m_state & Logfile_group::LG_WAITERS_THREAD)) { process_log_buffer_waiters(signal, lg_ptr); } } else { ndbout_c("miss matched writes"); } return; } void Lgman::execLCP_FRAG_ORD(Signal* signal) { jamEntry(); LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr(); Uint32 lcp_id= ord->lcpId; Uint32 frag_id = ord->fragmentId; Uint32 table_id = ord->tableId; Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); Uint32 entry= lcp_id == m_latest_lcp ? File_formats::Undofile::UNDO_LCP : File_formats::Undofile::UNDO_LCP_FIRST; if(!ptr.isNull() && ! (ptr.p->m_state & Logfile_group::LG_CUT_LOG_THREAD)) { jam(); ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD; signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } if(!ptr.isNull() && ptr.p->m_last_lsn) { Uint32 undo[3]; undo[0] = lcp_id; undo[1] = (table_id << 16) | frag_id; undo[2] = (entry << 16 ) | (sizeof(undo) >> 2); Uint64 last_lsn= m_last_lsn; if(ptr.p->m_last_lsn == last_lsn #ifdef VM_TRACE && ((rand() % 100) > 50) #endif ) { undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16; Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2); memcpy(dst, undo, sizeof(undo)); ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2)); ptr.p->m_free_file_words -= (sizeof(undo) >> 2); } else { Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2); * dst++ = last_lsn >> 32; * dst++ = last_lsn & 0xFFFFFFFF; memcpy(dst, undo, sizeof(undo)); ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2)); ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2); } ptr.p->m_last_lcp_lsn = last_lsn; m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1; validate_logfile_group(ptr, "execLCP_FRAG_ORD"); } while(!ptr.isNull()) { if (ptr.p->m_last_lsn) { /** * First LCP_FRAGORD for each LCP, sets tail pos */ if(m_latest_lcp != lcp_id) { ptr.p->m_tail_pos[0] = ptr.p->m_tail_pos[1]; ptr.p->m_tail_pos[1] = ptr.p->m_tail_pos[2]; ptr.p->m_tail_pos[2] = ptr.p->m_file_pos[HEAD]; } if(0) ndbout_c ("execLCP_FRAG_ORD (%d %d) (%d %d) (%d %d) free pages: %ld", ptr.p->m_tail_pos[0].m_ptr_i, ptr.p->m_tail_pos[0].m_idx, ptr.p->m_tail_pos[1].m_ptr_i, ptr.p->m_tail_pos[1].m_idx, ptr.p->m_tail_pos[2].m_ptr_i, ptr.p->m_tail_pos[2].m_idx, (long) (ptr.p->m_free_file_words / File_formats::UNDO_PAGE_WORDS)); } m_logfile_group_list.next(ptr); } m_latest_lcp = lcp_id; } void Lgman::execEND_LCP_REQ(Signal* signal) { EndLcpReq* req= (EndLcpReq*)signal->getDataPtr(); ndbrequire(m_latest_lcp == req->backupId); Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); bool wait= false; while(!ptr.isNull()) { Uint64 lcp_lsn = ptr.p->m_last_lcp_lsn; if(ptr.p->m_last_synced_lsn < lcp_lsn) { wait= true; if(signal->getSendersBlockRef() != reference()) { Logfile_client tmp(this, this, ptr.p->m_logfile_group_id); Logfile_client::Request req; req.m_callback.m_callbackData = ptr.i; req.m_callback.m_callbackFunction = safe_cast(&Lgman::endlcp_callback); ndbrequire(tmp.sync_lsn(signal, lcp_lsn, &req, 0) == 0); } } else { ptr.p->m_last_lcp_lsn = 0; } m_logfile_group_list.next(ptr); } if(wait) { return; } signal->theData[0] = 0; sendSignal(DBLQH_REF, GSN_END_LCP_CONF, signal, 1, JBB); } void Lgman::endlcp_callback(Signal* signal, Uint32 ptr, Uint32 res) { EndLcpReq* req= (EndLcpReq*)signal->getDataPtr(); req->backupId = m_latest_lcp; execEND_LCP_REQ(signal); } void Lgman::cut_log_tail(Signal* signal, Ptr<Logfile_group> ptr) { bool done= true; if (likely(ptr.p->m_last_lsn)) { Buffer_idx tmp= ptr.p->m_tail_pos[0]; Buffer_idx tail= ptr.p->m_file_pos[TAIL]; Ptr<Undofile> filePtr; m_file_pool.getPtr(filePtr, tail.m_ptr_i); if(!(tmp == tail)) { Uint32 free; if(tmp.m_ptr_i == tail.m_ptr_i && tail.m_idx < tmp.m_idx) { free= tmp.m_idx - tail.m_idx; ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS; ptr.p->m_file_pos[TAIL] = tmp; } else { free= filePtr.p->m_file_size - tail.m_idx - 1; ptr.p->m_free_file_words += free * File_formats::UNDO_PAGE_WORDS; Ptr<Undofile> next = filePtr; Local_undofile_list files(m_file_pool, ptr.p->m_files); while(files.next(next) && (next.p->m_state & Undofile::FS_EMPTY)) ndbrequire(next.i != filePtr.i); if(next.isNull()) { jam(); files.first(next); while((next.p->m_state & Undofile::FS_EMPTY) && files.next(next)) ndbrequire(next.i != filePtr.i); } tmp.m_idx= 0; tmp.m_ptr_i= next.i; ptr.p->m_file_pos[TAIL] = tmp; done= false; } } validate_logfile_group(ptr, "cut log"); } if (done) { ptr.p->m_state &= ~(Uint32)Logfile_group::LG_CUT_LOG_THREAD; m_logfile_group_list.next(ptr); } if(!done || !ptr.isNull()) { ptr.p->m_state |= Logfile_group::LG_CUT_LOG_THREAD; signal->theData[0] = LgmanContinueB::CUT_LOG_TAIL; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } } void Lgman::execSUB_GCP_COMPLETE_REP(Signal* signal) { jamEntry(); Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); /** * Filter all logfile groups in parallell */ return; // NOT IMPLETMENT YET signal->theData[0] = LgmanContinueB::FILTER_LOG; while(!ptr.isNull()) { signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); m_logfile_group_list.next(ptr); } } int Lgman::alloc_log_space(Uint32 ref, Uint32 words) { ndbrequire(words); words += 2; // lsn Logfile_group key; key.m_logfile_group_id= ref; Ptr<Logfile_group> ptr; if(m_logfile_group_hash.find(ptr, key) && ptr.p->m_free_file_words >= (words + (4 * File_formats::UNDO_PAGE_WORDS))) { ptr.p->m_free_file_words -= words; validate_logfile_group(ptr, "alloc_log_space"); return 0; } if(ptr.isNull()) { return -1; } return 1501; } int Lgman::free_log_space(Uint32 ref, Uint32 words) { ndbrequire(words); Logfile_group key; key.m_logfile_group_id= ref; Ptr<Logfile_group> ptr; if(m_logfile_group_hash.find(ptr, key)) { ptr.p->m_free_file_words += (words + 2); validate_logfile_group(ptr, "free_log_space"); return 0; } ndbrequire(false); return -1; } Uint64 Logfile_client::add_entry(const Change* src, Uint32 cnt) { Uint32 i, tot= 0; for(i= 0; i<cnt; i++) { tot += src[i].len; } Uint32 *dst; Uint64 last_lsn= m_lgman->m_last_lsn; { Lgman::Logfile_group key; key.m_logfile_group_id= m_logfile_group_id; Ptr<Lgman::Logfile_group> ptr; if(m_lgman->m_logfile_group_hash.find(ptr, key)) { Uint64 last_lsn_filegroup= ptr.p->m_last_lsn; if(last_lsn_filegroup == last_lsn #ifdef VM_TRACE && ((rand() % 100) > 50) #endif ) { dst= m_lgman->get_log_buffer(ptr, tot); for(i= 0; i<cnt; i++) { memcpy(dst, src[i].ptr, 4*src[i].len); dst += src[i].len; } * (dst - 1) |= File_formats::Undofile::UNDO_NEXT_LSN << 16; ptr.p->m_free_file_words += 2; ptr.p->m_free_buffer_words += 2; m_lgman->validate_logfile_group(ptr); } else { dst= m_lgman->get_log_buffer(ptr, tot + 2); * dst++ = last_lsn >> 32; * dst++ = last_lsn & 0xFFFFFFFF; for(i= 0; i<cnt; i++) { memcpy(dst, src[i].ptr, 4*src[i].len); dst += src[i].len; } } } m_lgman->m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1; return last_lsn; } } void Lgman::execSTART_RECREQ(Signal* signal) { m_latest_lcp = signal->theData[0]; Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); if(ptr.i != RNIL) { infoEvent("Applying undo to LCP: %d", m_latest_lcp); ndbout_c("Applying undo to LCP: %d", m_latest_lcp); find_log_head(signal, ptr); return; } signal->theData[0] = reference(); sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB); } void Lgman::find_log_head(Signal* signal, Ptr<Logfile_group> ptr) { ndbrequire(ptr.p->m_state & (Logfile_group::LG_STARTING | Logfile_group::LG_SORTING)); if(ptr.p->m_meta_files.isEmpty() && ptr.p->m_files.isEmpty()) { jam(); /** * Logfile_group wo/ any files */ ptr.p->m_state &= ~(Uint32)Logfile_group::LG_STARTING; ptr.p->m_state |= Logfile_group::LG_ONLINE; m_logfile_group_list.next(ptr); signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); return; } ptr.p->m_state = Logfile_group::LG_SORTING; /** * Read first page from each undofile (1 file at a time...) */ Local_undofile_list files(m_file_pool, ptr.p->m_meta_files); Ptr<Undofile> file_ptr; files.first(file_ptr); if(!file_ptr.isNull()) { /** * Use log buffer memory when reading */ Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i; file_ptr.p->m_online.m_outstanding= page_id; FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = file_ptr.p->m_fd; req->userReference = reference(); req->userPointer = file_ptr.i; req->varIndex = 1; // skip zero page req->numberOfPages = 1; req->data.pageData[0] = page_id; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatSharedPage); sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; file_ptr.p->m_state |= Undofile::FS_OUTSTANDING; return; } else { /** * All files have read first page * and m_files is sorted acording to lsn */ ndbrequire(!ptr.p->m_files.isEmpty()); Local_undofile_list read_files(m_file_pool, ptr.p->m_files); read_files.last(file_ptr); /** * Init binary search */ ptr.p->m_state = Logfile_group::LG_SEARCHING; file_ptr.p->m_state = Undofile::FS_SEARCHING; ptr.p->m_file_pos[TAIL].m_idx = 1; // left page ptr.p->m_file_pos[HEAD].m_idx = file_ptr.p->m_file_size; ptr.p->m_file_pos[HEAD].m_ptr_i = ((file_ptr.p->m_file_size - 1) >> 1) + 1; Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i; file_ptr.p->m_online.m_outstanding= page_id; FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = file_ptr.p->m_fd; req->userReference = reference(); req->userPointer = file_ptr.i; req->varIndex = ptr.p->m_file_pos[HEAD].m_ptr_i; req->numberOfPages = 1; req->data.pageData[0] = page_id; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatSharedPage); sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; file_ptr.p->m_state |= Undofile::FS_OUTSTANDING; return; } } void Lgman::execFSREADCONF(Signal* signal) { jamEntry(); Ptr<Undofile> ptr; Ptr<Logfile_group> lg_ptr; FsConf* conf = (FsConf*)signal->getDataPtr(); m_file_pool.getPtr(ptr, conf->userPointer); m_logfile_group_pool.getPtr(lg_ptr, ptr.p->m_logfile_group_ptr_i); ndbrequire(ptr.p->m_state & Undofile::FS_OUTSTANDING); ptr.p->m_state &= ~(Uint32)Undofile::FS_OUTSTANDING; Uint32 cnt= lg_ptr.p->m_outstanding_fs; ndbrequire(cnt); if((ptr.p->m_state & Undofile::FS_EXECUTING)== Undofile::FS_EXECUTING) { jam(); if(lg_ptr.p->m_next_reply_ptr_i == ptr.i) { Uint32 tot= 0; Local_undofile_list files(m_file_pool, lg_ptr.p->m_files); while(cnt && ! (ptr.p->m_state & Undofile::FS_OUTSTANDING)) { Uint32 state= ptr.p->m_state; Uint32 pages= ptr.p->m_online.m_outstanding; ndbrequire(pages); ptr.p->m_online.m_outstanding= 0; ptr.p->m_state &= ~(Uint32)Undofile::FS_MOVE_NEXT; tot += pages; cnt--; if((state & Undofile::FS_MOVE_NEXT) && !files.prev(ptr)) files.last(ptr); } lg_ptr.p->m_outstanding_fs = cnt; lg_ptr.p->m_pos[PRODUCER].m_current_pos.m_idx += tot; lg_ptr.p->m_next_reply_ptr_i = ptr.i; } return; } lg_ptr.p->m_outstanding_fs = cnt - 1; Ptr<GlobalPage> page_ptr; m_shared_page_pool.getPtr(page_ptr, ptr.p->m_online.m_outstanding); ptr.p->m_online.m_outstanding= 0; File_formats::Undofile::Undo_page* page = (File_formats::Undofile::Undo_page*)page_ptr.p; Uint64 lsn = 0; lsn += page->m_page_header.m_page_lsn_hi; lsn <<= 32; lsn += page->m_page_header.m_page_lsn_lo; switch(ptr.p->m_state){ case Undofile::FS_SORTING: jam(); break; case Undofile::FS_SEARCHING: jam(); find_log_head_in_file(signal, lg_ptr, ptr, lsn); return; default: case Undofile::FS_EXECUTING: case Undofile::FS_CREATING: case Undofile::FS_DROPPING: case Undofile::FS_ONLINE: case Undofile::FS_OPENING: case Undofile::FS_EMPTY: jam(); ndbrequire(false); } /** * Prepare for execution */ ptr.p->m_state = Undofile::FS_EXECUTING; ptr.p->m_online.m_lsn = lsn; /** * Insert into m_files */ { Local_undofile_list meta(m_file_pool, lg_ptr.p->m_meta_files); Local_undofile_list files(m_file_pool, lg_ptr.p->m_files); meta.remove(ptr); Ptr<Undofile> loop; files.first(loop); while(!loop.isNull() && loop.p->m_online.m_lsn <= lsn) files.next(loop); if(loop.isNull()) { /** * File has highest lsn, add last */ jam(); files.add(ptr); } else { /** * Insert file in correct position in file list */ files.insert(ptr, loop); } } find_log_head(signal, lg_ptr); } void Lgman::execFSREADREF(Signal* signal) { jamEntry(); SimulatedBlock::execFSREADREF(signal); ndbrequire(false); } void Lgman::find_log_head_in_file(Signal* signal, Ptr<Logfile_group> ptr, Ptr<Undofile> file_ptr, Uint64 last_lsn) { // a b // 3 4 5 0 1 Uint32 curr= ptr.p->m_file_pos[HEAD].m_ptr_i; Uint32 head= ptr.p->m_file_pos[HEAD].m_idx; Uint32 tail= ptr.p->m_file_pos[TAIL].m_idx; ndbrequire(head > tail); Uint32 diff = head - tail; if(DEBUG_SEARCH_LOG_HEAD) printf("tail: %d(%lld) head: %d last: %d(%lld) -> ", tail, file_ptr.p->m_online.m_lsn, head, curr, last_lsn); if(last_lsn > file_ptr.p->m_online.m_lsn) { if(DEBUG_SEARCH_LOG_HEAD) printf("moving tail "); file_ptr.p->m_online.m_lsn = last_lsn; ptr.p->m_file_pos[TAIL].m_idx = tail = curr; } else { if(DEBUG_SEARCH_LOG_HEAD) printf("moving head "); ptr.p->m_file_pos[HEAD].m_idx = head = curr; } if(diff > 1) { // We need to find more pages to be sure... ptr.p->m_file_pos[HEAD].m_ptr_i = curr = ((head + tail) >> 1); if(DEBUG_SEARCH_LOG_HEAD) ndbout_c("-> new search tail: %d(%lld) head: %d -> %d", tail, file_ptr.p->m_online.m_lsn, head, curr); Uint32 page_id = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i; file_ptr.p->m_online.m_outstanding= page_id; FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = file_ptr.p->m_fd; req->userReference = reference(); req->userPointer = file_ptr.i; req->varIndex = curr; req->numberOfPages = 1; req->data.pageData[0] = page_id; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatSharedPage); sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; file_ptr.p->m_state |= Undofile::FS_OUTSTANDING; return; } ndbrequire(diff == 1); if(DEBUG_SEARCH_LOG_HEAD) ndbout_c("-> found last page: %d", tail); ptr.p->m_state = 0; file_ptr.p->m_state = Undofile::FS_EXECUTING; ptr.p->m_last_lsn = file_ptr.p->m_online.m_lsn; ptr.p->m_last_read_lsn = file_ptr.p->m_online.m_lsn; ptr.p->m_last_synced_lsn = file_ptr.p->m_online.m_lsn; m_last_lsn = file_ptr.p->m_online.m_lsn; /** * Set HEAD position */ ptr.p->m_file_pos[HEAD].m_ptr_i = file_ptr.i; ptr.p->m_file_pos[HEAD].m_idx = tail; ptr.p->m_file_pos[TAIL].m_ptr_i = file_ptr.i; ptr.p->m_file_pos[TAIL].m_idx = tail - 1; ptr.p->m_next_reply_ptr_i = file_ptr.i; { Local_undofile_list files(m_file_pool, ptr.p->m_files); if(tail == 1) { /** * HEAD is first page in a file... * -> PREV should be in previous file */ Ptr<Undofile> prev = file_ptr; if(!files.prev(prev)) { files.last(prev); } ptr.p->m_file_pos[TAIL].m_ptr_i = prev.i; ptr.p->m_file_pos[TAIL].m_idx = prev.p->m_file_size - 1; ptr.p->m_next_reply_ptr_i = prev.i; } SimulatedBlock* fs = globalData.getBlock(NDBFS); infoEvent("Undo head - %s page: %d lsn: %lld", fs->get_filename(file_ptr.p->m_fd), tail, file_ptr.p->m_online.m_lsn); g_eventLogger.info("Undo head - %s page: %d lsn: %lld", fs->get_filename(file_ptr.p->m_fd), tail, file_ptr.p->m_online.m_lsn); for(files.prev(file_ptr); !file_ptr.isNull(); files.prev(file_ptr)) { infoEvent(" - next - %s(%lld)", fs->get_filename(file_ptr.p->m_fd), file_ptr.p->m_online.m_lsn); g_eventLogger.info(" - next - %s(%lld)", fs->get_filename(file_ptr.p->m_fd), file_ptr.p->m_online.m_lsn); } } /** * Start next logfile group */ m_logfile_group_list.next(ptr); signal->theData[0] = LgmanContinueB::FIND_LOG_HEAD; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); } void Lgman::init_run_undo_log(Signal* signal) { /** * Perform initial sorting of logfile groups */ Ptr<Logfile_group> group; Logfile_group_list& list= m_logfile_group_list; Logfile_group_list tmp(m_logfile_group_pool); list.first(group); while(!group.isNull()) { Ptr<Logfile_group> ptr= group; list.next(group); list.remove(ptr); { /** * Init buffer pointers */ ptr.p->m_free_buffer_words -= File_formats::UNDO_PAGE_WORDS; ptr.p->m_pos[CONSUMER].m_current_page.m_idx = 0; // 0 more pages read ptr.p->m_pos[PRODUCER].m_current_page.m_idx = 0; // 0 more pages read Uint32 page = ptr.p->m_pos[CONSUMER].m_current_pos.m_ptr_i; File_formats::Undofile::Undo_page* pageP = (File_formats::Undofile::Undo_page*)m_shared_page_pool.getPtr(page); ptr.p->m_pos[CONSUMER].m_current_pos.m_idx = pageP->m_words_used; ptr.p->m_pos[PRODUCER].m_current_pos.m_idx = 1; ptr.p->m_last_read_lsn++; } /** * Start producer thread */ signal->theData[0] = LgmanContinueB::READ_UNDO_LOG; signal->theData[1] = ptr.i; sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); /** * Insert in correct position in list of logfile_group's */ Ptr<Logfile_group> pos; for(tmp.first(pos); !pos.isNull(); tmp.next(pos)) if(ptr.p->m_last_read_lsn >= pos.p->m_last_read_lsn) break; if(pos.isNull()) tmp.add(ptr); else tmp.insert(ptr, pos); ptr.p->m_state = Logfile_group::LG_EXEC_THREAD | Logfile_group::LG_READ_THREAD; } list = tmp; execute_undo_record(signal); } void Lgman::read_undo_log(Signal* signal, Ptr<Logfile_group> ptr) { Uint32 cnt, free= ptr.p->m_free_buffer_words; if(! (ptr.p->m_state & Logfile_group::LG_EXEC_THREAD)) { jam(); /** * Logfile_group is done... */ ptr.p->m_state &= ~(Uint32)Logfile_group::LG_READ_THREAD; stop_run_undo_log(signal); return; } if(free <= File_formats::UNDO_PAGE_WORDS) { signal->theData[0] = LgmanContinueB::READ_UNDO_LOG; signal->theData[1] = ptr.i; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2); return; } Logfile_group::Position producer= ptr.p->m_pos[PRODUCER]; Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER]; if(producer.m_current_page.m_idx == 0) { /** * zero pages left in range -> switch range */ Lgman::Page_map::Iterator it; Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages); Uint32 sz = map.getSize(); Uint32 pos= (producer.m_current_page.m_ptr_i + sz - 2) % sz; map.position(it, pos); union { Uint32 _tmp[2]; Lgman::Buffer_idx range; }; _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data; producer.m_current_page.m_ptr_i = pos; producer.m_current_page.m_idx = range.m_idx; producer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx; } if(producer.m_current_page.m_ptr_i == consumer.m_current_page.m_ptr_i && producer.m_current_pos.m_ptr_i > consumer.m_current_pos.m_ptr_i) { Uint32 max= producer.m_current_pos.m_ptr_i - consumer.m_current_pos.m_ptr_i - 1; ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS); cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max); ndbrequire(cnt <= max); producer.m_current_pos.m_ptr_i -= cnt; producer.m_current_page.m_idx -= cnt; } else { Uint32 max= producer.m_current_page.m_idx; ndbrequire(free >= max * File_formats::UNDO_PAGE_WORDS); cnt= read_undo_pages(signal, ptr, producer.m_current_pos.m_ptr_i, max); ndbrequire(cnt <= max); producer.m_current_pos.m_ptr_i -= cnt; producer.m_current_page.m_idx -= cnt; } ndbrequire(free >= cnt * File_formats::UNDO_PAGE_WORDS); free -= (cnt * File_formats::UNDO_PAGE_WORDS); ptr.p->m_free_buffer_words = free; ptr.p->m_pos[PRODUCER] = producer; signal->theData[0] = LgmanContinueB::READ_UNDO_LOG; signal->theData[1] = ptr.i; if(free > File_formats::UNDO_PAGE_WORDS) sendSignal(reference(), GSN_CONTINUEB, signal, 2, JBB); else sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 2); } Uint32 Lgman::read_undo_pages(Signal* signal, Ptr<Logfile_group> ptr, Uint32 pageId, Uint32 pages) { ndbrequire(pages); Ptr<Undofile> filePtr; Buffer_idx tail= ptr.p->m_file_pos[TAIL]; m_file_pool.getPtr(filePtr, tail.m_ptr_i); if(filePtr.p->m_online.m_outstanding > 0) { jam(); return 0; } Uint32 max= tail.m_idx; FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtrSend(); req->filePointer = filePtr.p->m_fd; req->userReference = reference(); req->userPointer = filePtr.i; req->operationFlag = 0; FsReadWriteReq::setFormatFlag(req->operationFlag, FsReadWriteReq::fsFormatSharedPage); if(max > pages) { jam(); tail.m_idx -= pages; req->varIndex = 1 + tail.m_idx; req->numberOfPages = pages; req->data.pageData[0] = pageId - pages; ptr.p->m_file_pos[TAIL] = tail; if(DEBUG_UNDO_EXECUTION) ndbout_c("a reading from file: %d page(%d-%d) into (%d-%d)", ptr.i, 1 + tail.m_idx, 1+tail.m_idx+pages-1, pageId - pages, pageId - 1); sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; filePtr.p->m_state |= Undofile::FS_OUTSTANDING; filePtr.p->m_online.m_outstanding = pages; max = pages; } else { jam(); ndbrequire(tail.m_idx - max == 0); req->varIndex = 1; req->numberOfPages = max; req->data.pageData[0] = pageId - max; if(DEBUG_UNDO_EXECUTION) ndbout_c("b reading from file: %d page(%d-%d) into (%d-%d)", ptr.i, 1 , 1+max-1, pageId - max, pageId - 1); sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, FsReadWriteReq::FixedLength + 1, JBA); ptr.p->m_outstanding_fs++; filePtr.p->m_online.m_outstanding = max; filePtr.p->m_state |= Undofile::FS_OUTSTANDING | Undofile::FS_MOVE_NEXT; Ptr<Undofile> prev = filePtr; { Local_undofile_list files(m_file_pool, ptr.p->m_files); if(!files.prev(prev)) { jam(); files.last(prev); } } if(DEBUG_UNDO_EXECUTION) ndbout_c("changing file from %d to %d", filePtr.i, prev.i); tail.m_idx= prev.p->m_file_size - 1; tail.m_ptr_i= prev.i; ptr.p->m_file_pos[TAIL] = tail; if(max < pages && filePtr.i != prev.i) max += read_undo_pages(signal, ptr, pageId - max, pages - max); } return max; } void Lgman::execute_undo_record(Signal* signal) { Uint64 lsn; const Uint32* ptr; Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP); if((ptr = get_next_undo_record(&lsn))) { Uint32 len= (* ptr) & 0xFFFF; Uint32 type= (* ptr) >> 16; Uint32 mask= type & ~(Uint32)File_formats::Undofile::UNDO_NEXT_LSN; switch(mask){ case File_formats::Undofile::UNDO_END: stop_run_undo_log(signal); return; case File_formats::Undofile::UNDO_LCP: case File_formats::Undofile::UNDO_LCP_FIRST: { Uint32 lcp = * (ptr - len + 1); if(m_latest_lcp && lcp > m_latest_lcp) { if (0) { const Uint32 * base = ptr - len + 1; Uint32 lcp = base[0]; Uint32 tableId = base[1] >> 16; Uint32 fragId = base[1] & 0xFFFF; ndbout_c("NOT! ignoring lcp: %u tab: %u frag: %u", lcp, tableId, fragId); } } if(m_latest_lcp == 0 || lcp < m_latest_lcp || (lcp == m_latest_lcp && mask == File_formats::Undofile::UNDO_LCP_FIRST)) { stop_run_undo_log(signal); return; } // Fallthrough } case File_formats::Undofile::UNDO_TUP_ALLOC: case File_formats::Undofile::UNDO_TUP_UPDATE: case File_formats::Undofile::UNDO_TUP_FREE: case File_formats::Undofile::UNDO_TUP_CREATE: case File_formats::Undofile::UNDO_TUP_DROP: case File_formats::Undofile::UNDO_TUP_ALLOC_EXTENT: case File_formats::Undofile::UNDO_TUP_FREE_EXTENT: tup->disk_restart_undo(signal, lsn, mask, ptr - len + 1, len); return; default: ndbrequire(false); } } signal->theData[0] = LgmanContinueB::EXECUTE_UNDO_RECORD; sendSignal(LGMAN_REF, GSN_CONTINUEB, signal, 1, JBB); return; } const Uint32* Lgman::get_next_undo_record(Uint64 * this_lsn) { Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); Logfile_group::Position consumer= ptr.p->m_pos[CONSUMER]; Logfile_group::Position producer= ptr.p->m_pos[PRODUCER]; if(producer.m_current_pos.m_idx < 2) { jam(); /** * Wait for fetching pages... */ return 0; } Uint32 pos = consumer.m_current_pos.m_idx; Uint32 page = consumer.m_current_pos.m_ptr_i; File_formats::Undofile::Undo_page* pageP=(File_formats::Undofile::Undo_page*) m_shared_page_pool.getPtr(page); if(pos == 0) { /** * End of log */ pageP->m_data[0] = (File_formats::Undofile::UNDO_END << 16) | 1 ; pageP->m_page_header.m_page_lsn_hi = 0; pageP->m_page_header.m_page_lsn_lo = 0; pos= consumer.m_current_pos.m_idx= pageP->m_words_used = 1; this_lsn = 0; return pageP->m_data; } Uint32 *record= pageP->m_data + pos - 1; Uint32 len= (* record) & 0xFFFF; ndbrequire(len); Uint32 *prev= record - len; Uint64 lsn = 0; // Same page if(((* record) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN) { lsn = ptr.p->m_last_read_lsn - 1; ndbrequire((Int64)lsn >= 0); } else { ndbrequire(pos >= 3); lsn += * (prev - 1); lsn <<= 32; lsn += * (prev - 0); len += 2; ndbrequire((Int64)lsn >= 0); } ndbrequire(pos >= len); if(pos == len) { /** * Switching page */ ndbrequire(producer.m_current_pos.m_idx); ptr.p->m_pos[PRODUCER].m_current_pos.m_idx --; if(consumer.m_current_page.m_idx) { consumer.m_current_page.m_idx--; // left in range consumer.m_current_pos.m_ptr_i --; // page } else { // 0 pages left in range...switch range Lgman::Page_map::Iterator it; Page_map map(m_data_buffer_pool, ptr.p->m_buffer_pages); Uint32 sz = map.getSize(); Uint32 tmp = (consumer.m_current_page.m_ptr_i + sz - 2) % sz; map.position(it, tmp); union { Uint32 _tmp[2]; Lgman::Buffer_idx range; }; _tmp[0] = *it.data; map.next(it); _tmp[1] = *it.data; consumer.m_current_page.m_idx = range.m_idx - 1; // left in range consumer.m_current_page.m_ptr_i = tmp; // pos in map consumer.m_current_pos.m_ptr_i = range.m_ptr_i + range.m_idx - 1; // page } if(DEBUG_UNDO_EXECUTION) ndbout_c("reading from %d", consumer.m_current_pos.m_ptr_i); pageP=(File_formats::Undofile::Undo_page*) m_shared_page_pool.getPtr(consumer.m_current_pos.m_ptr_i); pos= consumer.m_current_pos.m_idx= pageP->m_words_used; Uint64 tmp = 0; tmp += pageP->m_page_header.m_page_lsn_hi; tmp <<= 32; tmp += pageP->m_page_header.m_page_lsn_lo; prev = pageP->m_data + pos - 1; if(((* prev) >> 16) & File_formats::Undofile::UNDO_NEXT_LSN) ndbrequire(lsn + 1 == ptr.p->m_last_read_lsn); ptr.p->m_pos[CONSUMER] = consumer; ptr.p->m_free_buffer_words += File_formats::UNDO_PAGE_WORDS; } else { ptr.p->m_pos[CONSUMER].m_current_pos.m_idx -= len; } * this_lsn = ptr.p->m_last_read_lsn = lsn; /** * Re-sort log file groups */ Ptr<Logfile_group> sort = ptr; if(m_logfile_group_list.next(sort)) { while(!sort.isNull() && sort.p->m_last_read_lsn > lsn) m_logfile_group_list.next(sort); if(sort.i != ptr.p->nextList) { m_logfile_group_list.remove(ptr); if(sort.isNull()) m_logfile_group_list.add(ptr); else m_logfile_group_list.insert(ptr, sort); } } return record; } void Lgman::stop_run_undo_log(Signal* signal) { bool running = false, outstanding = false; Ptr<Logfile_group> ptr; m_logfile_group_list.first(ptr); while(!ptr.isNull()) { /** * Mark exec thread as completed */ ptr.p->m_state &= ~(Uint32)Logfile_group::LG_EXEC_THREAD; if(ptr.p->m_state & Logfile_group::LG_READ_THREAD) { /** * Thread is still running...wait for it to complete */ running = true; } else if(ptr.p->m_outstanding_fs) { outstanding = true; // a FSREADREQ is outstanding...wait for it } else if(ptr.p->m_state != Logfile_group::LG_ONLINE) { /** * Fix log TAIL */ ndbrequire(ptr.p->m_state == 0); ptr.p->m_state = Logfile_group::LG_ONLINE; Buffer_idx tail= ptr.p->m_file_pos[TAIL]; Uint32 pages= ptr.p->m_pos[PRODUCER].m_current_pos.m_idx; while(pages) { Ptr<Undofile> file; m_file_pool.getPtr(file, tail.m_ptr_i); Uint32 page= tail.m_idx; Uint32 size= file.p->m_file_size; ndbrequire(size >= page); Uint32 diff= size - page; if(pages >= diff) { pages -= diff; Local_undofile_list files(m_file_pool, ptr.p->m_files); if(!files.next(file)) files.first(file); tail.m_idx = 1; tail.m_ptr_i= file.i; } else { tail.m_idx += pages; pages= 0; } } ptr.p->m_tail_pos[0] = tail; ptr.p->m_tail_pos[1] = tail; ptr.p->m_tail_pos[2] = tail; ptr.p->m_file_pos[TAIL] = tail; init_logbuffer_pointers(ptr); { Buffer_idx head= ptr.p->m_file_pos[HEAD]; Ptr<Undofile> file; m_file_pool.getPtr(file, head.m_ptr_i); if (head.m_idx == file.p->m_file_size - 1) { Local_undofile_list files(m_file_pool, ptr.p->m_files); if(!files.next(file)) { jam(); files.first(file); } head.m_idx = 0; head.m_ptr_i = file.i; ptr.p->m_file_pos[HEAD] = head; } } ptr.p->m_free_file_words = (Uint64)File_formats::UNDO_PAGE_WORDS * (Uint64)compute_free_file_pages(ptr); ptr.p->m_next_reply_ptr_i = ptr.p->m_file_pos[HEAD].m_ptr_i; ptr.p->m_state |= Logfile_group::LG_FLUSH_THREAD; signal->theData[0] = LgmanContinueB::FLUSH_LOG; signal->theData[1] = ptr.i; signal->theData[2] = 0; sendSignal(reference(), GSN_CONTINUEB, signal, 3, JBB); if(1) { SimulatedBlock* fs = globalData.getBlock(NDBFS); Ptr<Undofile> hf, tf; m_file_pool.getPtr(tf, tail.m_ptr_i); m_file_pool.getPtr(hf, ptr.p->m_file_pos[HEAD].m_ptr_i); infoEvent("Logfile group: %d ", ptr.p->m_logfile_group_id); g_eventLogger.info("Logfile group: %d ", ptr.p->m_logfile_group_id); infoEvent(" head: %s page: %d", fs->get_filename(hf.p->m_fd), ptr.p->m_file_pos[HEAD].m_idx); g_eventLogger.info(" head: %s page: %d", fs->get_filename(hf.p->m_fd), ptr.p->m_file_pos[HEAD].m_idx); infoEvent(" tail: %s page: %d", fs->get_filename(tf.p->m_fd), tail.m_idx); g_eventLogger.info(" tail: %s page: %d", fs->get_filename(tf.p->m_fd), tail.m_idx); } } m_logfile_group_list.next(ptr); } if(running) { jam(); return; } if(outstanding) { jam(); signal->theData[0] = LgmanContinueB::STOP_UNDO_LOG; sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 1); return; } infoEvent("Flushing page cache after undo completion"); g_eventLogger.info("Flushing page cache after undo completion"); /** * Start flushing pages (local, LCP) */ LcpFragOrd * ord = (LcpFragOrd *)signal->getDataPtr(); ord->lcpId = m_latest_lcp; sendSignal(PGMAN_REF, GSN_LCP_FRAG_ORD, signal, LcpFragOrd::SignalLength, JBB); EndLcpReq* req= (EndLcpReq*)signal->getDataPtr(); req->senderRef = reference(); sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, JBB); } void Lgman::execEND_LCP_CONF(Signal* signal) { Dbtup* tup= (Dbtup*)globalData.getBlock(DBTUP); tup->disk_restart_undo(signal, 0, File_formats::Undofile::UNDO_END, 0, 0); /** * pgman has completed flushing all pages * * insert "fake" LCP record preventing undo to be "rerun" */ Uint32 undo[3]; undo[0] = m_latest_lcp; undo[1] = (0 << 16) | 0; undo[2] = (File_formats::Undofile::UNDO_LCP_FIRST << 16 ) | (sizeof(undo) >> 2); Ptr<Logfile_group> ptr; ndbrequire(m_logfile_group_list.first(ptr)); Uint64 last_lsn= m_last_lsn; if(ptr.p->m_last_lsn == last_lsn #ifdef VM_TRACE && ((rand() % 100) > 50) #endif ) { undo[2] |= File_formats::Undofile::UNDO_NEXT_LSN << 16; Uint32 *dst= get_log_buffer(ptr, sizeof(undo) >> 2); memcpy(dst, undo, sizeof(undo)); ndbrequire(ptr.p->m_free_file_words >= (sizeof(undo) >> 2)); ptr.p->m_free_file_words -= (sizeof(undo) >> 2); } else { Uint32 *dst= get_log_buffer(ptr, (sizeof(undo) >> 2) + 2); * dst++ = last_lsn >> 32; * dst++ = last_lsn & 0xFFFFFFFF; memcpy(dst, undo, sizeof(undo)); ndbrequire(ptr.p->m_free_file_words >= ((sizeof(undo) >> 2) + 2)); ptr.p->m_free_file_words -= ((sizeof(undo) >> 2) + 2); } m_last_lsn = ptr.p->m_last_lsn = last_lsn + 1; ptr.p->m_last_synced_lsn = last_lsn; while(m_logfile_group_list.next(ptr)) ptr.p->m_last_synced_lsn = last_lsn; infoEvent("Flushing complete"); g_eventLogger.info("Flushing complete"); signal->theData[0] = reference(); sendSignal(DBLQH_REF, GSN_START_RECCONF, signal, 1, JBB); } #ifdef VM_TRACE void Lgman::validate_logfile_group(Ptr<Logfile_group> ptr, const char * heading) { do { if (ptr.p->m_file_pos[HEAD].m_ptr_i == RNIL) break; Uint32 pages = compute_free_file_pages(ptr); Uint32 group_pages = ((ptr.p->m_free_file_words + File_formats::UNDO_PAGE_WORDS - 1)/ File_formats::UNDO_PAGE_WORDS) ; Uint32 last = ptr.p->m_free_file_words % File_formats::UNDO_PAGE_WORDS; if(! (pages >= group_pages)) { ndbout << heading << " Tail: " << ptr.p->m_file_pos[TAIL] << " Head: " << ptr.p->m_file_pos[HEAD] << " free: " << group_pages << "(" << last << ")" << " found: " << pages; for(Uint32 i = 0; i<3; i++) { ndbout << " - " << ptr.p->m_tail_pos[i]; } ndbout << endl; ndbrequire(pages >= group_pages); } } while(0); } #endif void Lgman::execGET_TABINFOREQ(Signal* signal) { jamEntry(); if(!assembleFragments(signal)) { return; } GetTabInfoReq * const req = (GetTabInfoReq *)&signal->theData[0]; const Uint32 reqType = req->requestType & (~GetTabInfoReq::LongSignalConf); BlockReference retRef= req->senderRef; Uint32 senderData= req->senderData; Uint32 tableId= req->tableId; if(reqType == GetTabInfoReq::RequestByName){ jam(); if(signal->getNoOfSections()) releaseSections(signal); sendGET_TABINFOREF(signal, req, GetTabInfoRef::NoFetchByName); return; } Logfile_group key; key.m_logfile_group_id= tableId; Ptr<Logfile_group> ptr; m_logfile_group_hash.find(ptr, key); if(ptr.p->m_logfile_group_id != tableId) { jam(); if(signal->getNoOfSections()) releaseSections(signal); sendGET_TABINFOREF(signal, req, GetTabInfoRef::InvalidTableId); return; } GetTabInfoConf *conf = (GetTabInfoConf *)&signal->theData[0]; conf->senderData= senderData; conf->tableId= tableId; conf->freeWordsHi= ptr.p->m_free_file_words >> 32; conf->freeWordsLo= ptr.p->m_free_file_words & 0xFFFFFFFF; conf->tableType= DictTabInfo::LogfileGroup; conf->senderRef= reference(); sendSignal(retRef, GSN_GET_TABINFO_CONF, signal, GetTabInfoConf::SignalLength, JBB); } void Lgman::sendGET_TABINFOREF(Signal* signal, GetTabInfoReq * req, GetTabInfoRef::ErrorCode errorCode) { jamEntry(); GetTabInfoRef * const ref = (GetTabInfoRef *)&signal->theData[0]; /** * The format of GetTabInfo Req/Ref is the same */ BlockReference retRef = req->senderRef; ref->errorCode = errorCode; sendSignal(retRef, GSN_GET_TABINFOREF, signal, signal->length(), JBB); }