/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef NdbEventOperationImpl_H #define NdbEventOperationImpl_H #include <NdbEventOperation.hpp> #include <signaldata/SumaImpl.hpp> #include <transporter/TransporterDefinitions.hpp> #include <NdbRecAttr.hpp> #include <AttributeHeader.hpp> #include <UtilBuffer.hpp> #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4 class NdbEventOperationImpl; struct EventBufData { union { SubTableData *sdata; Uint32 *memory; }; LinearSectionPtr ptr[3]; unsigned sz; NdbEventOperationImpl *m_event_op; /* * Blobs are stored in blob list (m_next_blob) where each entry * is list of parts (m_next) in part number order. * * TODO order by part no and link for fast read and free_list */ EventBufData *m_next; // Next wrt to global order or Next blob part EventBufData *m_next_blob; // First part in next blob EventBufData *m_next_hash; // Next in per-GCI hash Uint32 m_pkhash; // PK hash (without op) for fast compare // Get blob part number from blob data Uint32 get_blob_part_no() { assert(ptr[0].sz > 2); Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() + AttributeHeader(ptr[0].p[1]).getDataSize(); Uint32 no = ptr[1].p[pos]; return no; } }; class EventBufData_list { public: EventBufData_list(); ~EventBufData_list(); void remove_first(); void append(EventBufData *data); void append(const EventBufData_list &list); int is_empty(); EventBufData *m_head, *m_tail; unsigned m_count; unsigned m_sz; // distinct ops per gci (assume no hash needed) struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; }; Gci_op* m_gci_op_list; Uint32 m_gci_op_count; Uint32 m_gci_op_alloc; private: void add_gci_op(Gci_op g); }; inline EventBufData_list::EventBufData_list() : m_head(0), m_tail(0), m_count(0), m_sz(0), m_gci_op_list(NULL), m_gci_op_count(0), m_gci_op_alloc(0) { } inline EventBufData_list::~EventBufData_list() { delete [] m_gci_op_list; } inline int EventBufData_list::is_empty() { return m_head == 0; } inline void EventBufData_list::remove_first() { m_count--; m_sz-= m_head->sz; m_head= m_head->m_next; if (m_head == 0) m_tail= 0; } inline void EventBufData_list::append(EventBufData *data) { Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation }; add_gci_op(g); data->m_next= 0; if (m_tail) m_tail->m_next= data; else { #ifdef VM_TRACE assert(m_count == 0); assert(m_sz == 0); #endif m_head= data; } m_tail= data; m_count++; m_sz+= data->sz; } inline void EventBufData_list::append(const EventBufData_list &list) { Uint32 i; for (i = 0; i < list.m_gci_op_count; i++) add_gci_op(list.m_gci_op_list[i]); if (m_tail) m_tail->m_next= list.m_head; else m_head= list.m_head; m_tail= list.m_tail; m_count+= list.m_count; m_sz+= list.m_sz; } // GCI bucket has also a hash over data, with key event op, table PK. // It can only be appended to and is invalid after remove_first(). class EventBufData_hash { public: struct Pos { // search result Uint32 index; // index into hash array EventBufData* data; // non-zero if found Uint32 pkhash; // PK hash }; static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]); static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]); void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]); void append(Pos& hpos, EventBufData* data); enum { GCI_EVENT_HASH_SIZE = 101 }; EventBufData* m_hash[GCI_EVENT_HASH_SIZE]; }; inline void EventBufData_hash::append(Pos& hpos, EventBufData* data) { data->m_next_hash = m_hash[hpos.index]; m_hash[hpos.index] = data; } struct Gci_container { enum State { GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order }; Uint32 m_state; Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done Uint64 m_gci; // GCI EventBufData_list m_data; EventBufData_hash m_data_hash; }; class NdbEventOperationImpl : public NdbEventOperation { public: NdbEventOperationImpl(NdbEventOperation &f, Ndb *theNdb, const char* eventName); NdbEventOperationImpl(Ndb *theNdb, NdbEventImpl& evnt); void init(NdbEventImpl& evnt); NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented ~NdbEventOperationImpl(); NdbEventOperation::State getState(); int execute(); int execute_nolock(); int stop(); NdbRecAttr *getValue(const char *colName, char *aValue, int n); NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n); NdbBlob *getBlobHandle(const char *colName, int n); NdbBlob *getBlobHandle(const NdbColumnImpl *, int n); int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count); int receive_event(); const bool tableNameChanged() const; const bool tableFrmChanged() const; const bool tableFragmentationChanged() const; const bool tableRangeListChanged() const; Uint64 getGCI(); Uint64 getLatestGCI(); bool execSUB_TABLE_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]); NdbDictionary::Event::TableEvent getEventType(); void print(); void printAll(); NdbEventOperation *m_facade; Uint32 m_magic_number; const NdbError & getNdbError() const; NdbError m_error; Ndb *m_ndb; NdbEventImpl *m_eventImpl; NdbRecAttr *theFirstPkAttrs[2]; NdbRecAttr *theCurrentPkAttrs[2]; NdbRecAttr *theFirstDataAttrs[2]; NdbRecAttr *theCurrentDataAttrs[2]; NdbBlob* theBlobList; NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops NdbEventOperationImpl* theMainOp; // in blob op, the main op NdbEventOperation::State m_state; /* note connection to mi_type */ Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING * else same as in EventImpl */ Uint32 m_eventId; Uint32 m_oid; bool m_mergeEvents; EventBufData *m_data_item; void *m_custom_data; int m_has_error; Uint32 m_fragmentId; UtilBuffer m_buffer; // Bit mask for what has changed in a table (for TE_ALTER event) Uint32 m_change_mask; #ifdef VM_TRACE Uint32 m_data_done_count; Uint32 m_data_count; #endif // managed by the ndb object NdbEventOperationImpl *m_next; NdbEventOperationImpl *m_prev; private: void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz); }; class NdbEventBuffer { public: NdbEventBuffer(Ndb*); ~NdbEventBuffer(); const Uint32 &m_system_nodes; Vector<Gci_container> m_active_gci; NdbEventOperation *createEventOperation(const char* eventName, NdbError &); NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt, NdbError &); void dropEventOperation(NdbEventOperation *); static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp); void add_drop_lock(); void add_drop_unlock(); void lock(); void unlock(); void add_op(); void remove_op(); void init_gci_containers(); // accessed from the "receive thread" int insertDataL(NdbEventOperationImpl *op, const SubTableData * const sdata, LinearSectionPtr ptr[3]); void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep); void complete_outof_order_gcis(); void report_node_failure(Uint32 node_id); void completeClusterFailed(); // used by user thread Uint64 getLatestGCI(); Uint32 getEventId(int bufferId); int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0); NdbEventOperation *nextEvent(); NdbEventOperationImpl* getGCIEventOperations(Uint32* iter, Uint32* event_types); NdbEventOperationImpl *move_data(); // routines to copy/merge events EventBufData* alloc_data(); int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]); int copy_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data); int merge_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data); int get_main_data(Gci_container* bucket, EventBufData_hash::Pos& hpos, EventBufData* blob_data); void add_blob_data(EventBufData* main_data, EventBufData* blob_data); void free_list(EventBufData_list &list); void reportStatus(); // Global Mutex used for some things static NdbMutex *p_add_drop_mutex; #ifdef VM_TRACE const char *m_latest_command; #endif Ndb *m_ndb; Uint64 m_latestGCI; // latest "handover" GCI Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order) NdbMutex *m_mutex; struct NdbCondition *p_cond; // receive thread Gci_container m_complete_data; EventBufData *m_free_data; #ifdef VM_TRACE unsigned m_free_data_count; #endif unsigned m_free_data_sz; // user thread EventBufData_list m_available_data; EventBufData_list m_used_data; unsigned m_total_alloc; // total allocated memory // threshholds to report status unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh; unsigned m_gci_slip_thresh; NdbError m_error; private: int expand(unsigned sz); // all allocated data struct EventBufData_chunk { unsigned sz; EventBufData data[1]; }; Vector<EventBufData_chunk *> m_allocated_data; unsigned m_sz; // dropped event operations that have not yet // been deleted NdbEventOperationImpl *m_dropped_ev_op; Uint32 m_active_op_count; }; inline NdbEventOperationImpl* NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp) { return &tOp->m_impl; } inline void NdbEventOperationImpl::receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz) { r->receive_data(data,sz); #if 0 if (sz) { assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz); r->theNULLind= 0; memcpy(r->aRef(), data, 4 * sz); return; } r->theNULLind= 1; #endif } #endif