/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <NDBT_Test.hpp>
#include <NDBT_ReturnCodes.h>
#include <HugoTransactions.hpp>
#include <UtilTransactions.hpp>
#include <TestNdbEventOperation.hpp>
#include <NdbAutoPtr.hpp>
#include <NdbRestarter.hpp>
#include <NdbRestarts.hpp>

#define GETNDB(ps) ((NDBT_NdbApiStep*)ps)->getNdb()

static int createEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
{
  char eventName[1024];
  sprintf(eventName,"%s_EVENT",tab.getName());

  NdbDictionary::Dictionary *myDict = pNdb->getDictionary();

  if (!myDict) {
    g_err << "Dictionary not found " 
	  << pNdb->getNdbError().code << " "
	  << pNdb->getNdbError().message << endl;
    return NDBT_FAILED;
  }

  NdbDictionary::Event myEvent(eventName);
  myEvent.setTable(tab.getName());
  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); 
  for(int a = 0; a < tab.getNoOfColumns(); a++){
    myEvent.addEventColumn(a);
  }

  int res = myDict->createEvent(myEvent); // Add event to database

  if (res == 0)
    myEvent.print();
  else if (myDict->getNdbError().classification ==
	   NdbError::SchemaObjectExists) 
  {
    g_info << "Event creation failed event exists\n";
    res = myDict->dropEvent(eventName);
    if (res) {
      g_err << "Failed to drop event: " 
	    << myDict->getNdbError().code << " : "
	    << myDict->getNdbError().message << endl;
      return NDBT_FAILED;
    }
    // try again
    res = myDict->createEvent(myEvent); // Add event to database
    if (res) {
      g_err << "Failed to create event (1): " 
	    << myDict->getNdbError().code << " : "
	    << myDict->getNdbError().message << endl;
      return NDBT_FAILED;
    }
  }
  else 
  {
    g_err << "Failed to create event (2): " 
	  << myDict->getNdbError().code << " : "
	  << myDict->getNdbError().message << endl;
    return NDBT_FAILED;
  }

  return NDBT_OK;
}

static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
{
  char eventName[1024];
  sprintf(eventName,"%s_EVENT",tab.getName());
  NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
  if (!myDict) {
    g_err << "Dictionary not found " 
	  << pNdb->getNdbError().code << " "
	  << pNdb->getNdbError().message << endl;
    return NDBT_FAILED;
  }
  if (myDict->dropEvent(eventName)) {
    g_err << "Failed to drop event: " 
	  << myDict->getNdbError().code << " : "
	  << myDict->getNdbError().message << endl;
    return NDBT_FAILED;
  }
  return NDBT_OK;
}
 
static int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step)
{
  if (createEvent(GETNDB(step),* ctx->getTab()) != 0){
    return NDBT_FAILED;
  }
  return NDBT_OK;
}

struct receivedEvent {
  Uint32 pk;
  Uint32 count;
  Uint32 event;
};

static int 
eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int records)
{
  int i;
  const char function[] = "HugoTransactions::eventOperation: ";
  struct receivedEvent* recInsertEvent;
  NdbAutoObjArrayPtr<struct receivedEvent>
    p00( recInsertEvent = new struct receivedEvent[3*records] );
  struct receivedEvent* recUpdateEvent = &recInsertEvent[records];
  struct receivedEvent* recDeleteEvent = &recInsertEvent[2*records];

  EventOperationStats &stats = *(EventOperationStats*)pstats;

  stats.n_inserts = 0;
  stats.n_deletes = 0;
  stats.n_updates = 0;
  stats.n_consecutive = 0;
  stats.n_duplicates = 0;
  stats.n_inconsistent_gcis = 0;

  for (i = 0; i < records; i++) {
    recInsertEvent[i].pk    = 0xFFFFFFFF;
    recInsertEvent[i].count = 0;
    recInsertEvent[i].event = 0xFFFFFFFF;

    recUpdateEvent[i].pk    = 0xFFFFFFFF;
    recUpdateEvent[i].count = 0;
    recUpdateEvent[i].event = 0xFFFFFFFF;

    recDeleteEvent[i].pk    = 0xFFFFFFFF;
    recDeleteEvent[i].count = 0;
    recDeleteEvent[i].event = 0xFFFFFFFF;
  }

  NdbDictionary::Dictionary *myDict = pNdb->getDictionary();

  if (!myDict) {
    g_err << function << "Event Creation failedDictionary not found\n";
    return NDBT_FAILED;
  }

  int                  r = 0;
  NdbEventOperation    *pOp;

  char eventName[1024];
  sprintf(eventName,"%s_EVENT",tab.getName());
  Uint32 noEventColumnName = tab.getNoOfColumns();

  g_info << function << "create EventOperation\n";
  pOp = pNdb->createEventOperation(eventName);
  if ( pOp == NULL ) {
    g_err << function << "Event operation creation failed\n";
    return NDBT_FAILED;
  }

  g_info << function << "get values\n";
  NdbRecAttr* recAttr[1024];
  NdbRecAttr* recAttrPre[1024];

  const NdbDictionary::Table *_table = myDict->getTable(tab.getName());

  for (int a = 0; a < noEventColumnName; a++) {
    recAttr[a]    = pOp->getValue(_table->getColumn(a)->getName());
    recAttrPre[a] = pOp->getPreValue(_table->getColumn(a)->getName());
  }
  
  // set up the callbacks
  g_info << function << "execute\n";
  if (pOp->execute()) { // This starts changes to "start flowing"
    g_err << function << "operation execution failed: \n";
    g_err << pOp->getNdbError().code << " "
	  << pOp->getNdbError().message << endl;
    return NDBT_FAILED;
  }

  g_info << function << "ok\n";

  int count = 0;
  Uint32 last_inconsitant_gci = 0xEFFFFFF0;

  while (r < records){
    //printf("now waiting for event...\n");
    int res = pNdb->pollEvents(1000); // wait for event or 1000 ms

    if (res > 0) {
      //printf("got data! %d\n", r);
      NdbEventOperation *tmp;
      while ((tmp= pNdb->nextEvent()))
      {
	assert(tmp == pOp);
	r++;
	count++;

	Uint32 gci = pOp->getGCI();
	Uint32 pk = recAttr[0]->u_32_value();

        if (!pOp->isConsistent()) {
	  if (last_inconsitant_gci != gci) {
	    last_inconsitant_gci = gci;
	    stats.n_inconsistent_gcis++;
	  }
	  g_warning << "A node failure has occured and events might be missing\n";	
	}
	g_info << function << "GCI " << gci << ": " << count;
	struct receivedEvent* recEvent;
	switch (pOp->getEventType()) {
	case NdbDictionary::Event::TE_INSERT:
	  stats.n_inserts++;
	  g_info << " INSERT: ";
	  recEvent = recInsertEvent;
	  break;
	case NdbDictionary::Event::TE_DELETE:
	  stats.n_deletes++;
	  g_info << " DELETE: ";
	  recEvent = recDeleteEvent;
	  break;
	case NdbDictionary::Event::TE_UPDATE:
	  stats.n_updates++;
	  g_info << " UPDATE: ";
	  recEvent = recUpdateEvent;
	  break;
	default:
	case NdbDictionary::Event::TE_ALL:
	  abort();
	}

	if ((int)pk < records) {
	  recEvent[pk].pk = pk;
	  recEvent[pk].count++;
	}

	for (i = 1; i < noEventColumnName; i++) {
	  if (recAttr[i]->isNULL() >= 0) { // we have a value
	    g_info << " post[" << i << "]=";
	    if (recAttr[i]->isNULL() == 0) // we have a non-null value
	      g_info << recAttr[i]->u_32_value();
	    else                           // we have a null value
	      g_info << "NULL";
	  }
	  if (recAttrPre[i]->isNULL() >= 0) { // we have a value
	    g_info << " pre[" << i << "]=";
	    if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
	      g_info << recAttrPre[i]->u_32_value();
	    else                              // we have a null value
	      g_info << "NULL";
	  }
	}
	g_info << endl;
      }
    } else
      ;//printf("timed out\n");
  }

  g_info << "dropping event operation" << endl;

  int res = pNdb->dropEventOperation(pOp);
  if (res != 0) {
    g_err << "operation execution failed\n";
    return NDBT_FAILED;
  }

  g_info << " ok" << endl;

  if (stats.n_inserts > 0) {
    stats.n_consecutive++;
  }
  if (stats.n_deletes > 0) {
    stats.n_consecutive++;
  }
  if (stats.n_updates > 0) {
    stats.n_consecutive++;
  }
  for (i = 0; i < (Uint32)records/3; i++) {
    if (recInsertEvent[i].pk != i) {
      stats.n_consecutive ++;
      ndbout << "missing insert pk " << i << endl;
    } else if (recInsertEvent[i].count > 1) {
      ndbout << "duplicates insert pk " << i
	     << " count " << recInsertEvent[i].count << endl;
      stats.n_duplicates += recInsertEvent[i].count-1;
    }
    if (recUpdateEvent[i].pk != i) {
      stats.n_consecutive ++;
      ndbout << "missing update pk " << i << endl;
    } else if (recUpdateEvent[i].count > 1) {
      ndbout << "duplicates update pk " << i
	     << " count " << recUpdateEvent[i].count << endl;
      stats.n_duplicates += recUpdateEvent[i].count-1;
    }
    if (recDeleteEvent[i].pk != i) {
      stats.n_consecutive ++;
      ndbout << "missing delete pk " << i << endl;
    } else if (recDeleteEvent[i].count > 1) {
      ndbout << "duplicates delete pk " << i
	     << " count " << recDeleteEvent[i].count << endl;
      stats.n_duplicates += recDeleteEvent[i].count-1;
    }
  }
  
  return NDBT_OK;
}

int runCreateShadowTable(NDBT_Context* ctx, NDBT_Step* step)
{
  const NdbDictionary::Table *table= ctx->getTab();
  char buf[1024];
  sprintf(buf, "%s_SHADOW", table->getName());

  GETNDB(step)->getDictionary()->dropTable(buf);
  if (GETNDB(step)->getDictionary()->getTable(buf))
  {
    g_err << "unsucessful drop of " << buf << endl;
    return NDBT_FAILED;
  }

  NdbDictionary::Table table_shadow(*table);
  table_shadow.setName(buf);
  // TODO should be removed
  // This should work wo/ next line
  //table_shadow.setNodeGroupIds(0, 0);
  GETNDB(step)->getDictionary()->createTable(table_shadow);
  if (GETNDB(step)->getDictionary()->getTable(buf))
    return NDBT_OK;

  g_err << "unsucessful create of " << buf << endl;
  return NDBT_FAILED;
}

int runDropShadowTable(NDBT_Context* ctx, NDBT_Step* step)
{
  const NdbDictionary::Table *table= ctx->getTab();
  char buf[1024];
  sprintf(buf, "%s_SHADOW", table->getName());
  
  GETNDB(step)->getDictionary()->dropTable(buf);
  return NDBT_OK;
}

int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step)
{
  int loops = ctx->getNumLoops();
  //int records = ctx->getNumRecords();
  HugoTransactions hugoTrans(*ctx->getTab());
  EventOperationStats stats;

  //Ndb *pNdb=GETNDB(step);
  const NdbDictionary::Table& tab= *ctx->getTab();
  //NdbEventOperation    *pOp;
  char eventName[1024];
  sprintf(eventName,"%s_EVENT",tab.getName());
  //int noEventColumnName = tab.getNoOfColumns();

  for (int i= 0; i < loops; i++)
  {
#if 1
    if (eventOperation(GETNDB(step), tab, (void*)&stats, 0) != 0){
      return NDBT_FAILED;
    }
#else
    g_info << "create EventOperation\n";
    pOp = pNdb->createEventOperation(eventName);
    if ( pOp == NULL ) {
      g_err << "Event operation creation failed\n";
      return NDBT_FAILED;
    }

    g_info << "dropping event operation" << endl;
    int res = pNdb->dropEventOperation(pOp);
    if (res != 0) {
      g_err << "operation execution failed\n";
      return NDBT_FAILED;
    }
#endif
  }

  return NDBT_OK;
}

int theThreadIdCounter = 0;

int runEventOperation(NDBT_Context* ctx, NDBT_Step* step)
{
  int tId = theThreadIdCounter++;
  //int loops = ctx->getNumLoops();
  int records = ctx->getNumRecords();
  HugoTransactions hugoTrans(*ctx->getTab());

  EventOperationStats stats;

  g_info << "***** start Id " << tId << endl;

  //  sleep(tId);

  if (eventOperation(GETNDB(step), *ctx->getTab(), (void*)&stats, 3*records) != 0){
    return NDBT_FAILED;
  }

  int ret;
  if (stats.n_inserts     == records &&
      stats.n_deletes     == records &&
      stats.n_updates     == records &&
      stats.n_consecutive == 3 &&
      stats.n_duplicates  == 0)
    ret = NDBT_OK;
  else
    ret = NDBT_FAILED;

  if (ret == NDBT_FAILED) {
    g_info << "***** end Id " << tId << endl;
    ndbout_c("n_inserts =           %d (%d)", stats.n_inserts, records);
    ndbout_c("n_deletes =           %d (%d)", stats.n_deletes, records);
    ndbout_c("n_updates =           %d (%d)", stats.n_updates, records);
    ndbout_c("n_consecutive =       %d (%d)", stats.n_consecutive, 3);
    ndbout_c("n_duplicates =        %d (%d)", stats.n_duplicates, 0);
    ndbout_c("n_inconsistent_gcis = %d (%d)", stats.n_inconsistent_gcis, 0);
  }

  return ret;
}

int runEventLoad(NDBT_Context* ctx, NDBT_Step* step)
{
  int loops = ctx->getNumLoops();
  int records = ctx->getNumRecords();
  HugoTransactions hugoTrans(*ctx->getTab());

  sleep(1);
#if 0
  sleep(5);
  sleep(theThreadIdCounter);
#endif
  if (hugoTrans.loadTable(GETNDB(step), records, 1, true, loops) != 0){
    return NDBT_FAILED;
  }
  if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, loops) != 0){
    return NDBT_FAILED;
  }
  if (hugoTrans.pkDelRecords(GETNDB(step),  records, 1, true, loops) != 0){
    return NDBT_FAILED;
  }
  return NDBT_OK;
}

int runEventMixedLoad(NDBT_Context* ctx, NDBT_Step* step)
{
  int loops = ctx->getNumLoops();
  int records = ctx->getNumRecords();
  HugoTransactions hugoTrans(*ctx->getTab());
  
  while(loops -- && !ctx->isTestStopped())
  {
    hugoTrans.clearTable(GETNDB(step), 0);

    if (hugoTrans.loadTable(GETNDB(step), 3*records, 1, true, 1) != 0){
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }

    if (hugoTrans.pkDelRecords(GETNDB(step), 3*records, 1, true, 1) != 0){
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }
    if (hugoTrans.loadTable(GETNDB(step), records, 1, true, 1) != 0){
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }
    if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }
    if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }
    if (hugoTrans.pkUpdateRecords(GETNDB(step), records, 1, 1) != 0){
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }

    ctx->setProperty("LastGCI", hugoTrans.m_latest_gci);
    if(ctx->getPropertyWait("LastGCI", ~(Uint32)0))
    {
      g_err << "FAIL " << __LINE__ << endl;
      return NDBT_FAILED;
    }
  }
  ctx->stopTest();  
  return NDBT_OK;
}

int runDropEvent(NDBT_Context* ctx, NDBT_Step* step)
{
  return NDBT_OK;
}

int runVerify(NDBT_Context* ctx, NDBT_Step* step)
{
  const NdbDictionary::Table * table= ctx->getTab();
  char buf[1024];

  sprintf(buf, "%s_SHADOW", table->getName());

  HugoTransactions hugoTrans(*table);
  if (hugoTrans.compare(GETNDB(step), buf, 0))
  {
    return NDBT_FAILED;
  }

  return NDBT_OK;
}

int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("runEventApplier");

  int result = NDBT_OK;
  const NdbDictionary::Table * table= ctx->getTab();
  HugoTransactions hugoTrans(* table);

  char shadow[1024], buf[1024];
  sprintf(shadow, "%s_SHADOW", table->getName());
  const NdbDictionary::Table * table_shadow;
  if ((table_shadow = GETNDB(step)->getDictionary()->getTable(shadow)) == 0)
  {
    g_err << "Unable to get table " << shadow << endl;
    DBUG_RETURN(NDBT_FAILED);
  }
  
  sprintf(buf, "%s_EVENT", table->getName());
  NdbEventOperation *pOp, *pCreate = 0;
  pCreate = pOp = GETNDB(step)->createEventOperation(buf);
  if ( pOp == NULL ) {
    g_err << "Event operation creation failed on %s" << buf << endl;
    DBUG_RETURN(NDBT_FAILED);
  }

  int i;
  int n_columns= table->getNoOfColumns();
  NdbRecAttr* recAttr[1024];
  NdbRecAttr* recAttrPre[1024];
  for (i = 0; i < n_columns; i++) {
    recAttr[i]    = pOp->getValue(table->getColumn(i)->getName());
    recAttrPre[i] = pOp->getPreValue(table->getColumn(i)->getName());
  }

  if (pOp->execute()) { // This starts changes to "start flowing"
    g_err << "execute operation execution failed: \n";
    g_err << pOp->getNdbError().code << " "
	  << pOp->getNdbError().message << endl;
    result = NDBT_FAILED;
    goto end;
  }

  while(!ctx->isTestStopped())
  {
    int r;
    int count= 0;
    Uint32 stop_gci= ~0;
    Uint64 curr_gci = 0;
    Ndb* ndb= GETNDB(step);

    while(!ctx->isTestStopped() && curr_gci <= stop_gci)
    {
      ndb->pollEvents(100, &curr_gci);
      while ((pOp= ndb->nextEvent()) != 0)
      {
	assert(pOp == pCreate);
	int noRetries= 0;
	do
	{
	  NdbTransaction *trans= GETNDB(step)->startTransaction();
	  if (trans == 0)
	  {
	    g_err << "startTransaction failed "
		  << GETNDB(step)->getNdbError().code << " "
		  << GETNDB(step)->getNdbError().message << endl;
	    result = NDBT_FAILED;
	    goto end;

	  }
	
	  NdbOperation *op= trans->getNdbOperation(table_shadow);
	  if (op == 0)
	  {
	    g_err << "getNdbOperation failed "
		  << trans->getNdbError().code << " "
		  << trans->getNdbError().message << endl;
	    result = NDBT_FAILED;
	    goto end;

	  }
	
	  switch (pOp->getEventType()) {
	  case NdbDictionary::Event::TE_INSERT:
	    if (op->writeTuple())
	    {
	      g_err << "insertTuple "
		    << op->getNdbError().code << " "
		    << op->getNdbError().message << endl;
	      result = NDBT_FAILED;
	      goto end;

	    }
	    break;
	  case NdbDictionary::Event::TE_DELETE:
	    if (op->deleteTuple())
	    {
	      g_err << "deleteTuple "
		    << op->getNdbError().code << " "
		    << op->getNdbError().message << endl;
	      result = NDBT_FAILED;
	      goto end;

	    }
	    break;
	  case NdbDictionary::Event::TE_UPDATE:
	    if (op->writeTuple())
	    {
	      g_err << "updateTuple "
		    << op->getNdbError().code << " "
		    << op->getNdbError().message << endl;
	      result = NDBT_FAILED;
	      goto end;

	    }
	    break;
	  default:
	    abort();
	  }

	  for (i= 0; i < n_columns; i++)
	  {
	    if (recAttr[i]->isNULL())
	    {
	      if (table->getColumn(i)->getPrimaryKey())
	      {
		g_err << "internal error: primary key isNull()="
		      << recAttr[i]->isNULL() << endl;
		result = NDBT_FAILED;
		goto end;

	      }
	      switch (pOp->getEventType()) {
	      case NdbDictionary::Event::TE_INSERT:
		if (recAttr[i]->isNULL() < 0)
		{
		  g_err << "internal error: missing value for insert\n";
		  result = NDBT_FAILED;
		  goto end;

		}
		break;
	      case NdbDictionary::Event::TE_DELETE:
		break;
	      case NdbDictionary::Event::TE_UPDATE:
		break;
	      default:
		abort();
	      }
	    }
	    if (table->getColumn(i)->getPrimaryKey() &&
		op->equal(i,recAttr[i]->aRef()))
	    {
	      g_err << "equal " << i << " "
		    << op->getNdbError().code << " "
		    << op->getNdbError().message << endl;
	      result = NDBT_FAILED;
	      goto end;

	    }
	  }
	
	  switch (pOp->getEventType()) {
	  case NdbDictionary::Event::TE_INSERT:
	    for (i= 0; i < n_columns; i++)
	    {
	      if (!table->getColumn(i)->getPrimaryKey() &&
		  op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef()))
	      {
		g_err << "setValue(insert) " << i << " "
		      << op->getNdbError().code << " "
		      << op->getNdbError().message << endl;
		result = NDBT_FAILED;
		goto end;

	      }
	    }
	    break;
	  case NdbDictionary::Event::TE_DELETE:
	    break;
	  case NdbDictionary::Event::TE_UPDATE:
	    for (i= 0; i < n_columns; i++)
	    {
	      if (!table->getColumn(i)->getPrimaryKey() &&
		  recAttr[i]->isNULL() >= 0 &&
		  op->setValue(i,recAttr[i]->isNULL() ? 0:recAttr[i]->aRef()))
	      {
		g_err << "setValue(update) " << i << " "
		      << op->getNdbError().code << " "
		      << op->getNdbError().message << endl;
		result = NDBT_FAILED;
		goto end;

	      }
	    }
	    break;
	  default:
	  case NdbDictionary::Event::TE_ALL:
	    abort();
	  }
	  if (trans->execute(Commit) == 0)
	  {
	    trans->close();
	    count++;
	    // everything ok
	    break;
	  }

	  if (trans->getNdbError().status == NdbError::PermanentError)
	  {
	    g_err << "Ignoring execute " << r << " failed "
		  << trans->getNdbError().code << " "
		  << trans->getNdbError().message << endl;
	  
	    trans->close();
	    count++;
	    break;
	  }
	  else if (noRetries++ == 10)
	  {
	    g_err << "execute " << r << " failed "
		  << trans->getNdbError().code << " "
		  << trans->getNdbError().message << endl;
	    trans->close();
	    result = NDBT_FAILED;
	    goto end;

	  }
	  trans->close();
	  NdbSleep_MilliSleep(100); // sleep before retying
	} while(1);
      }
      stop_gci = ctx->getProperty("LastGCI", ~(Uint32)0);
    } 
    
    ndbout_c("Applied gci: %d, %d events", stop_gci, count);
    if (hugoTrans.compare(GETNDB(step), shadow, 0))
    {
      g_err << "compare failed" << endl;
      result = NDBT_FAILED;
      goto end;
    }
    ctx->setProperty("LastGCI", ~(Uint32)0);
    ctx->broadcast();
  }
  
end:
  if(pCreate)
  {
    if (GETNDB(step)->dropEventOperation(pCreate)) {
      g_err << "dropEventOperation execution failed "
	    << GETNDB(step)->getNdbError().code << " "
	    << GETNDB(step)->getNdbError().message << endl;
      result = NDBT_FAILED;
    }
  }
  ctx->stopTest();
  DBUG_RETURN(result);
}

int runRestarter(NDBT_Context* ctx, NDBT_Step* step){
  int result = NDBT_OK;
  int loops = ctx->getNumLoops();
  NdbRestarter restarter;
  int i = 0;
  int lastId = 0;

  if (restarter.getNumDbNodes() < 2){
    ctx->stopTest();
    return NDBT_OK;
  }

  if(restarter.waitClusterStarted(60) != 0){
    g_err << "Cluster failed to start" << endl;
    return NDBT_FAILED;
  }
  
  while(result != NDBT_FAILED && !ctx->isTestStopped()){

    int id = lastId % restarter.getNumDbNodes();
    int nodeId = restarter.getDbNodeId(id);
    ndbout << "Restart node " << nodeId << endl; 
    if(restarter.restartOneDbNode(nodeId, false, false, true) != 0){
      g_err << "Failed to restartNextDbNode" << endl;
      result = NDBT_FAILED;
      break;
    }    

    if(restarter.waitClusterStarted(60) != 0){
      g_err << "Cluster failed to start" << endl;
      result = NDBT_FAILED;
      break;
    }
    
    lastId++;
    i++;
  }

  return result;
}

Vector<const NdbDictionary::Table*> pTabs;
Vector<const NdbDictionary::Table*> pShadowTabs;

static int getAllTables(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("getAllTables");
  Ndb * ndb= GETNDB(step);
  NdbDictionary::Dictionary * dict = ndb->getDictionary();
  pTabs.clear();

  for (int i= 0; i < ctx->getNumTables(); i++)
  {
    const NdbDictionary::Table *pTab= dict->getTable(ctx->getTableName(i));
    if (pTab == 0)
    {
      ndbout << "Failed to get table" << endl;
      ndbout << dict->getNdbError() << endl;
      DBUG_RETURN(NDBT_FAILED);
    }
    pTabs.push_back(pTab);
    ndbout << " " << ctx->getTableName(i);
  }
  pTabs.push_back(NULL);
  ndbout << endl;

  DBUG_RETURN(NDBT_OK);
}

static int createAllEvents(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("createAllEvents");
  Ndb * ndb= GETNDB(step);
  for (int i= 0; pTabs[i]; i++)
  {
    if (createEvent(ndb,*pTabs[i]))
    {
      DBUG_RETURN(NDBT_FAILED);
    }
  }
  DBUG_RETURN(NDBT_OK);
}

static int dropAllEvents(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("createAllEvents");
  Ndb * ndb= GETNDB(step);
  int i;

  for (i= 0; pTabs[i]; i++)
  {
    if (dropEvent(ndb,*pTabs[i]))
    {
      DBUG_RETURN(NDBT_FAILED);
    }
  }
  DBUG_RETURN(NDBT_OK);
}

static int createAllShadows(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("createAllShadows");
  Ndb * ndb= GETNDB(step);
  NdbDictionary::Dictionary * dict = ndb->getDictionary();
  // create a "shadow" table for each table
  for (int i= 0; pTabs[i]; i++)
  {
    char buf[1024];
    sprintf(buf, "%s_SHADOW", pTabs[i]->getName());

    dict->dropTable(buf);
    if (dict->getTable(buf))
    {
      DBUG_RETURN(NDBT_FAILED);
    }

    NdbDictionary::Table table_shadow(*pTabs[i]);
    table_shadow.setName(buf);
    if (dict->createTable(table_shadow))
    {
      g_err << "createTable(" << buf << ") "
	    << dict->getNdbError().code << " "
	    << dict->getNdbError().message << endl;
      DBUG_RETURN(NDBT_FAILED);
    }
    pShadowTabs.push_back(dict->getTable(buf));
    if (!pShadowTabs[i])
    {
      g_err << "getTable(" << buf << ") "
	    << dict->getNdbError().code << " "
	    << dict->getNdbError().message << endl;
      DBUG_RETURN(NDBT_FAILED);
    }
  }
  DBUG_RETURN(NDBT_OK);
}

static int dropAllShadows(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("dropAllShadows");
  Ndb * ndb= GETNDB(step);
  NdbDictionary::Dictionary * dict = ndb->getDictionary();

  for (int i= 0; pTabs[i]; i++)
  {
    char buf[1024];
    sprintf(buf, "%s_SHADOW", pTabs[i]->getName());
    if (dict->dropTable(buf))
    {
      DBUG_RETURN(NDBT_FAILED);
    }
  }  
  DBUG_RETURN(NDBT_OK);
}

static int start_transaction(Ndb *ndb, Vector<HugoOperations*> &ops)
{
  if (ops[0]->startTransaction(ndb) != NDBT_OK)
    return -1;
  NdbTransaction * t= ops[0]->getTransaction();
  for (int i= ops.size()-1; i > 0; i--)
  {
    ops[i]->setTransaction(t,true);
  }
  return 0;
}

static int close_transaction(Ndb *ndb, Vector<HugoOperations*> &ops)
{
  if (ops[0]->closeTransaction(ndb) != NDBT_OK)
    return -1;
  for (int i= ops.size()-1; i > 0; i--)
  {
    ops[i]->setTransaction(NULL,true);
  }
  return 0;
}

static int execute_commit(Ndb *ndb, Vector<HugoOperations*> &ops)
{
  if (ops[0]->execute_Commit(ndb) != NDBT_OK)
    return -1;
  return 0;
}

static int copy_events(Ndb *ndb)
{
  DBUG_ENTER("copy_events");
  int r= 0;
  NdbDictionary::Dictionary * dict = ndb->getDictionary();
  int n_inserts= 0;
  int n_updates= 0;
  int n_deletes= 0;
  while (1)
  {
    int res= ndb->pollEvents(1000); // wait for event or 1000 ms
    DBUG_PRINT("info", ("pollEvents res=%d", res));
    if (res <= 0)
    {
      break;
    }
    NdbEventOperation *pOp;
    while ((pOp= ndb->nextEvent()))
    {
      char buf[1024];
      sprintf(buf, "%s_SHADOW", pOp->getTable()->getName());
      const NdbDictionary::Table *table= dict->getTable(buf);

      if (table == 0)
      {
	g_err << "unable to find table " << buf << endl;
	DBUG_RETURN(-1);
      }

      if (pOp->isOverrun())
      {
	g_err << "buffer overrun\n";
	DBUG_RETURN(-1);
      }
      r++;
      
      if (!pOp->isConsistent()) {
	g_err << "A node failure has occured and events might be missing\n";
	DBUG_RETURN(-1);
      }
	
      int noRetries= 0;
      do
      {
	NdbTransaction *trans= ndb->startTransaction();
	if (trans == 0)
	{
	  g_err << "startTransaction failed "
		<< ndb->getNdbError().code << " "
		<< ndb->getNdbError().message << endl;
	  DBUG_RETURN(-1);
	}
	
	NdbOperation *op= trans->getNdbOperation(table);
	if (op == 0)
	{
	  g_err << "getNdbOperation failed "
		<< trans->getNdbError().code << " "
		<< trans->getNdbError().message << endl;
	  DBUG_RETURN(-1);
	}
	
	switch (pOp->getEventType()) {
	case NdbDictionary::Event::TE_INSERT:
	  if (op->insertTuple())
	  {
	    g_err << "insertTuple "
		  << op->getNdbError().code << " "
		  << op->getNdbError().message << endl;
	    DBUG_RETURN(-1);
	  }
	  if (noRetries == 0)
	  {
	    n_inserts++;
	  }
	  break;
	case NdbDictionary::Event::TE_DELETE:
	  if (op->deleteTuple())
	  {
	    g_err << "deleteTuple "
		  << op->getNdbError().code << " "
		  << op->getNdbError().message << endl;
	    DBUG_RETURN(-1);
	  }
	  if (noRetries == 0)
	  {
	    n_deletes++;
	  }
	  break;
	case NdbDictionary::Event::TE_UPDATE:
	  if (op->updateTuple())
	  {
	    g_err << "updateTuple "
		  << op->getNdbError().code << " "
		  << op->getNdbError().message << endl;
	    DBUG_RETURN(-1);
	  }
	  if (noRetries == 0)
	  {
	    n_updates++;
	  }
	  break;
	default:
	  abort();
	}
	
	{
	  for (const NdbRecAttr *pk= pOp->getFirstPkAttr();
	       pk;
	       pk= pk->next())
	  {
	    if (pk->isNULL())
	    {
	      g_err << "internal error: primary key isNull()="
		    << pk->isNULL() << endl;
	      DBUG_RETURN(NDBT_FAILED);
	    }
	    if (op->equal(pk->getColumn()->getColumnNo(),pk->aRef()))
	    {
	      g_err << "equal " << pk->getColumn()->getColumnNo() << " "
		    << op->getNdbError().code << " "
		    << op->getNdbError().message << endl;
	      DBUG_RETURN(NDBT_FAILED);
	    }
	  }
	}
	
	switch (pOp->getEventType()) {
	case NdbDictionary::Event::TE_INSERT:
	{
	  for (const NdbRecAttr *data= pOp->getFirstDataAttr();
	       data;
	       data= data->next())
	  {
	    if (data->isNULL() < 0 ||
		op->setValue(data->getColumn()->getColumnNo(),
			     data->isNULL() ? 0:data->aRef()))
	    {
	      g_err << "setValue(insert) " << data->getColumn()->getColumnNo()
		    << " " << op->getNdbError().code
		    << " " << op->getNdbError().message << endl;
	      DBUG_RETURN(-1);
	    }
	  }
	  break;
	}
	case NdbDictionary::Event::TE_DELETE:
	  break;
	case NdbDictionary::Event::TE_UPDATE:
	{
	  for (const NdbRecAttr *data= pOp->getFirstDataAttr();
	       data;
	       data= data->next())
	  {
	    if (data->isNULL() >= 0 &&
		op->setValue(data->getColumn()->getColumnNo(),
			     data->isNULL() ? 0:data->aRef()))
	    {
	      g_err << "setValue(update) " << data->getColumn()->getColumnNo()
		    << " " << op->getNdbError().code
		    << " " << op->getNdbError().message << endl;
	      DBUG_RETURN(NDBT_FAILED);
	    }
	  }
	  break;
	}
	default:
	case NdbDictionary::Event::TE_ALL:
	  abort();
	}
	if (trans->execute(Commit) == 0)
	{
	  trans->close();
	  // everything ok
	  break;
	}
	if (noRetries++ == 10 ||
	    trans->getNdbError().status != NdbError::TemporaryError)
	{
	  g_err << "execute " << r << " failed "
		<< trans->getNdbError().code << " "
		<< trans->getNdbError().message << endl;
	  trans->close();
	  DBUG_RETURN(-1);
	}
	trans->close();
	NdbSleep_MilliSleep(100); // sleep before retying
      } while(1);
    } // for
  } // while(1)
  g_info << "n_updates: " << n_updates << " "
	 << "n_inserts: " << n_inserts << " "
	 << "n_deletes: " << n_deletes << endl;
  DBUG_RETURN(r);
}

static int verify_copy(Ndb *ndb,
		       Vector<const NdbDictionary::Table *> &tabs1,
		       Vector<const NdbDictionary::Table *> &tabs2)
{
  for (unsigned i= 0; i < tabs1.size(); i++)
    if (tabs1[i])
    {
      HugoTransactions hugoTrans(*tabs1[i]);
      if (hugoTrans.compare(ndb, tabs2[i]->getName(), 0))
	return -1;
    }
  return 0;
}

static int createEventOperations(Ndb * ndb)
{
  DBUG_ENTER("createEventOperations");
  int i;

  // creat all event ops
  for (i= 0; pTabs[i]; i++)
  {
    char buf[1024];
    sprintf(buf, "%s_EVENT", pTabs[i]->getName());
    NdbEventOperation *pOp= ndb->createEventOperation(buf);
    if ( pOp == NULL )
    {
      DBUG_RETURN(NDBT_FAILED);
    }

    int n_columns= pTabs[i]->getNoOfColumns();
    for (int j = 0; j < n_columns; j++)
    {
      pOp->getValue(pTabs[i]->getColumn(j)->getName());
      pOp->getPreValue(pTabs[i]->getColumn(j)->getName());
    }

    if ( pOp->execute() )
    {
      DBUG_RETURN(NDBT_FAILED);
    }
  }

  DBUG_RETURN(NDBT_OK);
}

static int dropEventOperations(Ndb * ndb)
{
  DBUG_ENTER("dropEventOperations");

  NdbEventOperation *pOp;
  while ( (pOp= ndb->getEventOperation()) )
  {
    if (ndb->dropEventOperation(pOp))
    {
      DBUG_RETURN(NDBT_FAILED);
    }
  }

  DBUG_RETURN(NDBT_OK);
}

static int runMulti(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("runMulti");

  Ndb * ndb= GETNDB(step);

  int no_error= 1;
  int i;

  if (createEventOperations(ndb))
  {
    DBUG_RETURN(NDBT_FAILED);
  }

  // create a hugo operation per table
  Vector<HugoOperations *> hugo_ops;
  for (i= 0; no_error && pTabs[i]; i++)
  {
    hugo_ops.push_back(new HugoOperations(*pTabs[i]));
  }

  int n_records= 3;
  // insert n_records records per table
  do {
    if (start_transaction(ndb, hugo_ops))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
    for (i= 0; no_error && pTabs[i]; i++)
    {
      hugo_ops[i]->pkInsertRecord(ndb, 0, n_records);
    }
    if (execute_commit(ndb, hugo_ops))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
    if(close_transaction(ndb, hugo_ops))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
  } while(0);

  // copy events and verify
  do {
    if (copy_events(ndb) < 0)
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
    if (verify_copy(ndb, pTabs, pShadowTabs))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
  } while (0);

  // update n_records-1 records in first table
  do {
    if (start_transaction(ndb, hugo_ops))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }

    hugo_ops[0]->pkUpdateRecord(ndb, n_records-1);

    if (execute_commit(ndb, hugo_ops))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
    if(close_transaction(ndb, hugo_ops))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
  } while(0);

  // copy events and verify
  do {
    if (copy_events(ndb) < 0)
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
    if (verify_copy(ndb, pTabs, pShadowTabs))
    {
      no_error= 0;
      DBUG_RETURN(NDBT_FAILED);
    }
  } while (0);

  if (dropEventOperations(ndb))
  {
    DBUG_RETURN(NDBT_FAILED);
  }

  if (no_error)
    DBUG_RETURN(NDBT_OK);
  DBUG_RETURN(NDBT_FAILED);
}

static int runMulti_NR(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("runMulti");

  int records = ctx->getNumRecords();
  int loops = ctx->getNumLoops();
  Ndb * ndb= GETNDB(step);

  int i;

  if (createEventOperations(ndb))
  {
    DBUG_RETURN(NDBT_FAILED);
  }

  for (i= 0; pTabs[i]; i++)
  {
    HugoTransactions hugo(*pTabs[i]);
    if (hugo.loadTable(ndb, records, 1, true, 1))
    {
      DBUG_RETURN(NDBT_FAILED);
    }
    // copy events and verify
    if (copy_events(ndb) < 0)
    {
      DBUG_RETURN(NDBT_FAILED);
    }
  }

  if (verify_copy(ndb, pTabs, pShadowTabs))
  {
    DBUG_RETURN(NDBT_FAILED);
  }

  {
    NdbRestarts restarts;
    for (int j= 0; j < loops; j++)
    {
      // restart a node
      int timeout = 240;
      if (restarts.executeRestart("RestartRandomNodeAbort", timeout))
      {
	DBUG_RETURN(NDBT_FAILED);
      }

      sleep(5);
      // update all tables
      for (i= 0; pTabs[i]; i++)
      {
	HugoTransactions hugo(*pTabs[i]);
	if (hugo.pkUpdateRecords(ndb, records, 1, 1))
	{
	  DBUG_RETURN(NDBT_FAILED);
	}
	if (copy_events(ndb) < 0)
	{
	  DBUG_RETURN(NDBT_FAILED);
	}
      }

      // copy events and verify
      if (verify_copy(ndb, pTabs, pShadowTabs))
      {
	DBUG_RETURN(NDBT_FAILED);
      }
    }
  }

  if (dropEventOperations(ndb))
  {
    DBUG_RETURN(NDBT_FAILED);
  }

  DBUG_RETURN(NDBT_OK);
}


NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation", 
	 "Verify that we can listen to Events"
	 "NOTE! No errors are allowed!" )
{
#if 0
  TABLE("T1");
  TABLE("T3");
  TABLE("T5");
  TABLE("T6");
  TABLE("T8");
#endif
  INITIALIZER(runCreateEvent);
  STEP(runEventOperation);
  STEP(runEventLoad);
  FINALIZER(runDropEvent);
}
TESTCASE("CreateDropEventOperation", 
	 "Verify that we can Create and Drop many times"
	 "NOTE! No errors are allowed!" ){
  INITIALIZER(runCreateEvent);
  STEP(runCreateDropEventOperation);
  FINALIZER(runDropEvent);
}
TESTCASE("ParallellEventOperation", 
	 "Verify that we can listen to Events in parallell"
	 "NOTE! No errors are allowed!" ){
  INITIALIZER(runCreateEvent);
  STEP(runEventOperation);
  STEP(runEventOperation);
  STEP(runEventLoad);
  FINALIZER(runDropEvent);
}
TESTCASE("EventOperationApplier", 
	 "Verify that if we apply the data we get from event "
	 "operation is the same as the original table"
	 "NOTE! No errors are allowed!" ){
  INITIALIZER(runCreateEvent);
  INITIALIZER(runCreateShadowTable);
  STEP(runEventApplier);
  STEP(runEventMixedLoad);
  FINALIZER(runDropEvent);
  FINALIZER(runVerify);
  FINALIZER(runDropShadowTable);
}
TESTCASE("EventOperationApplier_NR", 
	 "Verify that if we apply the data we get from event "
	 "operation is the same as the original table"
	 "NOTE! No errors are allowed!" ){
  INITIALIZER(runCreateEvent);
  INITIALIZER(runCreateShadowTable);
  STEP(runEventApplier);
  STEP(runEventMixedLoad);
  STEP(runRestarter);
  FINALIZER(runDropEvent);
  FINALIZER(runVerify);
  FINALIZER(runDropShadowTable);
}
TESTCASE("Multi", 
	 "Verify that we can work with all tables in parallell"
	 "NOTE! HugoOperations::startTransaction, pTrans != NULL errors, "
	 "are allowed!" ){
  ALL_TABLES();
  INITIALIZER(getAllTables);
  INITIALIZER(createAllEvents);
  INITIALIZER(createAllShadows);
  STEP(runMulti);
  FINALIZER(dropAllShadows);
  FINALIZER(dropAllEvents);
}
TESTCASE("Multi_NR", 
	 "Verify that we can work with all tables in parallell"
	 "NOTE! HugoOperations::startTransaction, pTrans != NULL errors, "
	 "are allowed!" ){
  ALL_TABLES();
  INITIALIZER(getAllTables);
  INITIALIZER(createAllEvents);
  INITIALIZER(createAllShadows);
  STEP(runMulti_NR);
  FINALIZER(dropAllShadows);
  FINALIZER(dropAllEvents);
}
NDBT_TESTSUITE_END(test_event);

int main(int argc, const char** argv){
  ndb_init();
  test_event.setCreateAllTables(true);
  return test_event.execute(argc, argv);
}

template class Vector<HugoOperations *>;
template class Vector<NdbEventOperation *>;
template class Vector<NdbRecAttr*>;
template class Vector<Vector<NdbRecAttr*> >;