/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef NdbEventOperationImpl_H
#define NdbEventOperationImpl_H

#include <NdbEventOperation.hpp>
#include <signaldata/SumaImpl.hpp>
#include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#include <UtilBuffer.hpp>

#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4

class NdbEventOperationImpl;

struct EventBufData
{
  union {
    SubTableData *sdata;
    Uint32 *memory;
  };
  LinearSectionPtr ptr[3];
  unsigned sz;
  NdbEventOperationImpl *m_event_op;

  /*
   * Blobs are stored in blob list (m_next_blob) where each entry
   * is list of parts (m_next) in part number order.
   *
   * TODO order by part no and link for fast read and free_list
   */

  EventBufData *m_next; // Next wrt to global order or Next blob part
  EventBufData *m_next_blob; // First part in next blob

  EventBufData *m_next_hash; // Next in per-GCI hash
  Uint32 m_pkhash; // PK hash (without op) for fast compare

  // Get blob part number from blob data
  Uint32 get_blob_part_no() {
    assert(ptr[0].sz > 2);
    Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() +
                AttributeHeader(ptr[0].p[1]).getDataSize();
    Uint32 no = ptr[1].p[pos];
    return no;
  }
};

class EventBufData_list
{
public:
  EventBufData_list();
  ~EventBufData_list();

  void remove_first();
  void append(EventBufData *data);
  void append(const EventBufData_list &list);

  int is_empty();

  EventBufData *m_head, *m_tail;
  unsigned m_count;
  unsigned m_sz;

  // distinct ops per gci (assume no hash needed)
  struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; };
  Gci_op* m_gci_op_list;
  Uint32 m_gci_op_count;
  Uint32 m_gci_op_alloc;
private:
  void add_gci_op(Gci_op g);
};

inline
EventBufData_list::EventBufData_list()
  : m_head(0), m_tail(0),
    m_count(0),
    m_sz(0),
    m_gci_op_list(NULL),
    m_gci_op_count(0),
    m_gci_op_alloc(0)
{
}

inline
EventBufData_list::~EventBufData_list()
{
  delete [] m_gci_op_list;
}

inline
int EventBufData_list::is_empty()
{
  return m_head == 0;
}

inline
void EventBufData_list::remove_first()
{
  m_count--;
  m_sz-= m_head->sz;
  m_head= m_head->m_next;
  if (m_head == 0)
    m_tail= 0;
}

inline
void EventBufData_list::append(EventBufData *data)
{
  Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
  add_gci_op(g);

  data->m_next= 0;
  if (m_tail)
    m_tail->m_next= data;
  else
  {
#ifdef VM_TRACE
    assert(m_count == 0);
    assert(m_sz == 0);
#endif
    m_head= data;
  }
  m_tail= data;

  m_count++;
  m_sz+= data->sz;
}

inline
void EventBufData_list::append(const EventBufData_list &list)
{
  Uint32 i;
  for (i = 0; i < list.m_gci_op_count; i++)
    add_gci_op(list.m_gci_op_list[i]);

  if (m_tail)
    m_tail->m_next= list.m_head;
  else
    m_head= list.m_head;
  m_tail= list.m_tail;
  m_count+= list.m_count;
  m_sz+= list.m_sz;
}

// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class EventBufData_hash
{
public:
  struct Pos { // search result
    Uint32 index;       // index into hash array
    EventBufData* data; // non-zero if found
    Uint32 pkhash;      // PK hash
  };

  static Uint32 getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
  static bool getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3], LinearSectionPtr ptr2[3]);

  void search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ptr[3]);
  void append(Pos& hpos, EventBufData* data);

  enum { GCI_EVENT_HASH_SIZE = 101 };
  EventBufData* m_hash[GCI_EVENT_HASH_SIZE];
};

inline
void EventBufData_hash::append(Pos& hpos, EventBufData* data)
{
  data->m_next_hash = m_hash[hpos.index];
  m_hash[hpos.index] = data;
}

struct Gci_container
{
  enum State 
  {
    GC_COMPLETE = 0x1 // GCI is complete, but waiting for out of order
  };
  
  Uint32 m_state;
  Uint32 m_gcp_complete_rep_count; // Remaining SUB_GCP_COMPLETE_REP until done
  Uint64 m_gci;                    // GCI
  EventBufData_list m_data;
  EventBufData_hash m_data_hash;
};

class NdbEventOperationImpl : public NdbEventOperation {
public:
  NdbEventOperationImpl(NdbEventOperation &f,
			Ndb *theNdb, 
			const char* eventName);
  NdbEventOperationImpl(Ndb *theNdb, 
			NdbEventImpl& evnt);
  void init(NdbEventImpl& evnt);
  NdbEventOperationImpl(NdbEventOperationImpl&); //unimplemented
  NdbEventOperationImpl& operator=(const NdbEventOperationImpl&); //unimplemented
  ~NdbEventOperationImpl();

  NdbEventOperation::State getState();

  int execute();
  int execute_nolock();
  int stop();
  NdbRecAttr *getValue(const char *colName, char *aValue, int n);
  NdbRecAttr *getValue(const NdbColumnImpl *, char *aValue, int n);
  NdbBlob *getBlobHandle(const char *colName, int n);
  NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
  int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
  int receive_event();
  const bool tableNameChanged() const;
  const bool tableFrmChanged() const;
  const bool tableFragmentationChanged() const;
  const bool tableRangeListChanged() const;
  Uint64 getGCI();
  Uint64 getLatestGCI();
  bool execSUB_TABLE_DATA(NdbApiSignal * signal, 
                          LinearSectionPtr ptr[3]);

  NdbDictionary::Event::TableEvent getEventType();

  void print();
  void printAll();

  NdbEventOperation *m_facade;
  Uint32 m_magic_number;

  const NdbError & getNdbError() const;
  NdbError m_error;

  Ndb *m_ndb;
  NdbEventImpl *m_eventImpl;

  NdbRecAttr *theFirstPkAttrs[2];
  NdbRecAttr *theCurrentPkAttrs[2];
  NdbRecAttr *theFirstDataAttrs[2];
  NdbRecAttr *theCurrentDataAttrs[2];

  NdbBlob* theBlobList;
  NdbEventOperationImpl* theBlobOpList; // in main op, list of blob ops
  NdbEventOperationImpl* theMainOp; // in blob op, the main op

  NdbEventOperation::State m_state; /* note connection to mi_type */
  Uint32 mi_type; /* should be == 0 if m_state != EO_EXECUTING
		   * else same as in EventImpl
		   */
  Uint32 m_eventId;
  Uint32 m_oid;

  bool m_mergeEvents;
  
  EventBufData *m_data_item;

  void *m_custom_data;
  int m_has_error;

  Uint32 m_fragmentId;
  UtilBuffer m_buffer;

  // Bit mask for what has changed in a table (for TE_ALTER event)
  Uint32 m_change_mask;

#ifdef VM_TRACE
  Uint32 m_data_done_count;
  Uint32 m_data_count;
#endif

  // managed by the ndb object
  NdbEventOperationImpl *m_next;
  NdbEventOperationImpl *m_prev;
private:
  void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
};

class NdbEventBuffer {
public:
  NdbEventBuffer(Ndb*);
  ~NdbEventBuffer();

  const Uint32 &m_system_nodes;
  Vector<Gci_container> m_active_gci;
  NdbEventOperation *createEventOperation(const char* eventName,
					  NdbError &);
  NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt,
					  NdbError &);
  void dropEventOperation(NdbEventOperation *);
  static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);

  void add_drop_lock();
  void add_drop_unlock();
  void lock();
  void unlock();

  void add_op();
  void remove_op();
  void init_gci_containers();

  // accessed from the "receive thread"
  int insertDataL(NdbEventOperationImpl *op,
		  const SubTableData * const sdata,
		  LinearSectionPtr ptr[3]);
  void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep);
  void complete_outof_order_gcis();
  
  void report_node_failure(Uint32 node_id);
  void completeClusterFailed();

  // used by user thread 
  Uint64 getLatestGCI();
  Uint32 getEventId(int bufferId);

  int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
  NdbEventOperation *nextEvent();
  NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
                                               Uint32* event_types);

  NdbEventOperationImpl *move_data();

  // routines to copy/merge events
  EventBufData* alloc_data();
  int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]);
  int copy_data(const SubTableData * const sdata,
                LinearSectionPtr ptr[3],
                EventBufData* data);
  int merge_data(const SubTableData * const sdata,
                 LinearSectionPtr ptr[3],
                 EventBufData* data);
  int get_main_data(Gci_container* bucket,
                    EventBufData_hash::Pos& hpos,
                    EventBufData* blob_data);
  void add_blob_data(EventBufData* main_data,
                     EventBufData* blob_data);

  void free_list(EventBufData_list &list);

  void reportStatus();

  // Global Mutex used for some things
  static NdbMutex *p_add_drop_mutex;

#ifdef VM_TRACE
  const char *m_latest_command;
#endif

  Ndb *m_ndb;
  Uint64 m_latestGCI;           // latest "handover" GCI
  Uint64 m_latest_complete_GCI; // latest complete GCI (in case of outof order)

  NdbMutex *m_mutex;
  struct NdbCondition *p_cond;

  // receive thread
  Gci_container m_complete_data;
  EventBufData *m_free_data;
#ifdef VM_TRACE
  unsigned m_free_data_count;
#endif
  unsigned m_free_data_sz;

  // user thread
  EventBufData_list m_available_data;
  EventBufData_list m_used_data;

  unsigned m_total_alloc; // total allocated memory

  // threshholds to report status
  unsigned m_free_thresh, m_min_free_thresh, m_max_free_thresh;
  unsigned m_gci_slip_thresh;

  NdbError m_error;
private:
  int expand(unsigned sz);

  // all allocated data
  struct EventBufData_chunk
  {
    unsigned sz;
    EventBufData data[1];
  };
  Vector<EventBufData_chunk *> m_allocated_data;
  unsigned m_sz;

  // dropped event operations that have not yet
  // been deleted
  NdbEventOperationImpl *m_dropped_ev_op;

  Uint32 m_active_op_count;
};

inline
NdbEventOperationImpl*
NdbEventBuffer::getEventOperationImpl(NdbEventOperation* tOp)
{
  return &tOp->m_impl;
}

inline void
NdbEventOperationImpl::receive_data(NdbRecAttr *r,
				    const Uint32 *data,
				    Uint32 sz)
{
  r->receive_data(data,sz);
#if 0
  if (sz)
  {
    assert((r->attrSize() * r->arraySize() + 3) >> 2 == sz);
    r->theNULLind= 0;
    memcpy(r->aRef(), data, 4 * sz);
    return;
  }
  r->theNULLind= 1;
#endif
}

#endif