/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/***************************************************************
* I N C L U D E D   F I L E S                                  *
***************************************************************/

#include <ndb_global.h>

#include "dbGenerator.h"
#include <NdbApi.hpp>
#include <NdbOut.hpp>
#include <NdbSleep.h>

/***************************************************************
* L O C A L   C O N S T A N T S                                *
***************************************************************/

/***************************************************************
* L O C A L   D A T A   S T R U C T U R E S                    *
***************************************************************/

/***************************************************************
* L O C A L   F U N C T I O N S                                *
***************************************************************/

static void getRandomSubscriberNumber(SubscriberNumber number);
static void getRandomServerId(ServerId *serverId);
static void getRandomChangedBy(ChangedBy changedBy);
static void getRandomChangedTime(ChangedTime changedTime);

static void clearTransaction(TransactionDefinition *trans);
static void initGeneratorStatistics(GeneratorStatistics *gen);

static void doOneTransaction(ThreadData * td, 
			     int parallellism,
			     int millisSendPoll,
			     int minEventSendPoll,
			     int forceSendPoll);
static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);

/***************************************************************
* L O C A L   D A T A                                          *
***************************************************************/

static SequenceValues transactionDefinition[] = {
   {25, 1},
   {25, 2},
   {20, 3},
   {15, 4},
   {15, 5},
   {0,  0}
};

static SequenceValues rollbackDefinition[] = {
   {98, 0},
   {2 , 1},
   {0,  0}
};

static int maxsize = 0;

/***************************************************************
* P U B L I C   D A T A                                        *
***************************************************************/

/***************************************************************
****************************************************************
* L O C A L   F U N C T I O N S   C O D E   S E C T I O N      *
****************************************************************
***************************************************************/

static void getRandomSubscriberNumber(SubscriberNumber number)
{
   uint32 tmp;
   char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
   tmp = myRandom48(NO_OF_SUBSCRIBERS);
   sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
   memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
}

static void getRandomServerId(ServerId *serverId)
{
   *serverId = myRandom48(NO_OF_SERVERS);
}

static void getRandomChangedBy(ChangedBy changedBy)
{
   memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
   changedBy[CHANGED_BY_LENGTH] = 0;
}

static void getRandomChangedTime(ChangedTime changedTime)
{
   memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
   changedTime[CHANGED_TIME_LENGTH] = 0;
}

static void clearTransaction(TransactionDefinition *trans)
{
  trans->count            = 0;
  trans->branchExecuted   = 0;
  trans->rollbackExecuted = 0;
  trans->latencyCounter   = myRandom48(127);
  trans->latency.reset();
}

static int listFull(SessionList *list)
{
   return(list->numberInList == SESSION_LIST_LENGTH);
}

static int listEmpty(SessionList *list)
{
   return(list->numberInList == 0);
}

static void insertSession(SessionList     *list, 
                          SubscriberNumber number,
                          ServerId         serverId)
{
   SessionElement *e;
   if( listFull(list) ) return;

   e = &list->list[list->writeIndex];

   strcpy(e->subscriberNumber, number);
   e->serverId = serverId;

   list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
   list->numberInList++;

   if( list->numberInList > maxsize )
     maxsize = list->numberInList;
}

static SessionElement *getNextSession(SessionList *list)
{
   if( listEmpty(list) ) return(0);

   return(&list->list[list->readIndex]);
}

static void deleteSession(SessionList *list)
{
   if( listEmpty(list) ) return;

   list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
   list->numberInList--;
}

static void initGeneratorStatistics(GeneratorStatistics *gen)
{
   int i;

   if( initSequence(&gen->transactionSequence,
                    transactionDefinition) != 0 ) {
      ndbout_c("could not set the transaction types");
      exit(0);
   }

   if( initSequence(&gen->rollbackSequenceT4,
                    rollbackDefinition) != 0 ) {
      ndbout_c("could not set the rollback sequence");
      exit(0);
   }

   if( initSequence(&gen->rollbackSequenceT5,
                    rollbackDefinition) != 0 ) {
      ndbout_c("could not set the rollback sequence");
      exit(0);
   }

   for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
      clearTransaction(&gen->transactions[i]);

   gen->totalTransactions = 0;

   gen->activeSessions.numberInList = 0;
   gen->activeSessions.readIndex    = 0;
   gen->activeSessions.writeIndex   = 0;
}


static 
void 
doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
{
  int i;
  unsigned int transactionType;
  int async = 1;
  if (p == 1) {
    async = 0;
  }//if
  for(i = 0; i<p; i++){
    if(td[i].runState == Runnable){
      transactionType = getNextRandom(&td[i].generator.transactionSequence);

      switch(transactionType) {
      case 1:
	doTransaction_T1(td[i].pNDB, &td[i], async);
	break;
      case 2:
	doTransaction_T2(td[i].pNDB, &td[i], async);
	break;
      case 3:
	doTransaction_T3(td[i].pNDB, &td[i], async);
	break;
      case 4:
	doTransaction_T4(td[i].pNDB, &td[i], async);
	break;
      case 5:
	doTransaction_T5(td[i].pNDB, &td[i], async);
	break;
      default:
	ndbout_c("Unknown transaction type: %d", transactionType);
      }
    }
  }
  if (async == 1) {
    td[0].pNDB->sendPollNdb(millis, minEvents, force);
  }//if
}  

static 
void 
doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
{
  /*----------------*/
  /* Init arguments */
  /*----------------*/
  getRandomSubscriberNumber(td->transactionData.number);
  getRandomChangedBy(td->transactionData.changed_by);
  snprintf(td->transactionData.changed_time,
	   sizeof(td->transactionData.changed_time),
	   "%ld - %d", td->changedTime++, myRandom48(65536*1024));
  //getRandomChangedTime(td->transactionData.changed_time);
  td->transactionData.location = td->transactionData.changed_by[0];
  
  /*-----------------*/
  /* Run transaction */
  /*-----------------*/
  td->runState = Running;
  td->generator.transactions[0].startLatency();

  start_T1(pNDB, td, async);
}

static
void 
doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
{
  /*----------------*/
  /* Init arguments */
  /*----------------*/
  getRandomSubscriberNumber(td->transactionData.number);

  /*-----------------*/
  /* Run transaction */
  /*-----------------*/
  td->runState = Running;
  td->generator.transactions[1].startLatency();

  start_T2(pNDB, td, async);
}

static
void 
doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
{
  SessionElement  *se;
  
  /*----------------*/
  /* Init arguments */
  /*----------------*/
  se = getNextSession(&td->generator.activeSessions);
  if( se ) {
    strcpy(td->transactionData.number, se->subscriberNumber);
    td->transactionData.server_id = se->serverId;
    td->transactionData.sessionElement = 1;
  } else {
    getRandomSubscriberNumber(td->transactionData.number);
    getRandomServerId(&td->transactionData.server_id);
    td->transactionData.sessionElement = 0;
  }
  
  td->transactionData.server_bit = (1 << td->transactionData.server_id);

  /*-----------------*/
  /* Run transaction */
  /*-----------------*/
  td->runState = Running;
  td->generator.transactions[2].startLatency();
  start_T3(pNDB, td, async);
}

static 
void 
doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
{
   /*----------------*/
   /* Init arguments */
   /*----------------*/
  getRandomSubscriberNumber(td->transactionData.number);
  getRandomServerId(&td->transactionData.server_id);
  
  td->transactionData.server_bit = (1 << td->transactionData.server_id);
  td->transactionData.do_rollback = 
    getNextRandom(&td->generator.rollbackSequenceT4);

#if 0
  memset(td->transactionData.session_details, 
	 myRandom48(26)+'A', SESSION_DETAILS_LENGTH);
#endif
  td->transactionData.session_details[SESSION_DETAILS_LENGTH] = 0;
  
  /*-----------------*/
  /* Run transaction */
  /*-----------------*/
  td->runState = Running;
  td->generator.transactions[3].startLatency();
  start_T4(pNDB, td, async);
}

static 
void 
doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
{
  SessionElement * se;
  se = getNextSession(&td->generator.activeSessions);
  if( se ) {
    strcpy(td->transactionData.number, se->subscriberNumber);
    td->transactionData.server_id = se->serverId;
    td->transactionData.sessionElement = 1;
  }
  else {
    getRandomSubscriberNumber(td->transactionData.number);
    getRandomServerId(&td->transactionData.server_id);
    td->transactionData.sessionElement = 0;
  }
  
  td->transactionData.server_bit = (1 << td->transactionData.server_id);
  td->transactionData.do_rollback  
    = getNextRandom(&td->generator.rollbackSequenceT5);
  
  /*-----------------*/
  /* Run transaction */
  /*-----------------*/
  td->runState = Running;
  td->generator.transactions[4].startLatency();
  start_T5(pNDB, td, async);
}

void
complete_T1(ThreadData * data){
  data->generator.transactions[0].stopLatency();
  data->generator.transactions[0].count++;

  data->runState = Runnable;
  data->generator.totalTransactions++;
}

void 
complete_T2(ThreadData * data){
  data->generator.transactions[1].stopLatency();
  data->generator.transactions[1].count++;

  data->runState = Runnable;
  data->generator.totalTransactions++;
}

void 
complete_T3(ThreadData * data){

  data->generator.transactions[2].stopLatency();
  data->generator.transactions[2].count++;

  if(data->transactionData.branchExecuted)
    data->generator.transactions[2].branchExecuted++;

  data->runState = Runnable;
  data->generator.totalTransactions++;
}

void 
complete_T4(ThreadData * data){

  data->generator.transactions[3].stopLatency();
  data->generator.transactions[3].count++;

  if(data->transactionData.branchExecuted)
    data->generator.transactions[3].branchExecuted++;
  if(data->transactionData.do_rollback)
    data->generator.transactions[3].rollbackExecuted++;
  
  if(data->transactionData.branchExecuted &&
     !data->transactionData.do_rollback){
    insertSession(&data->generator.activeSessions, 
		  data->transactionData.number, 
		  data->transactionData.server_id);
  }

  data->runState = Runnable;
  data->generator.totalTransactions++;

}
void 
complete_T5(ThreadData * data){

  data->generator.transactions[4].stopLatency();
  data->generator.transactions[4].count++;

  if(data->transactionData.branchExecuted)
    data->generator.transactions[4].branchExecuted++;
  if(data->transactionData.do_rollback)
    data->generator.transactions[4].rollbackExecuted++;
  
  if(data->transactionData.sessionElement && 
     !data->transactionData.do_rollback){
    deleteSession(&data->generator.activeSessions);
  }
  
  data->runState = Runnable;
  data->generator.totalTransactions++;
}

/***************************************************************
****************************************************************
* P U B L I C   F U N C T I O N S   C O D E   S E C T I O N    *
****************************************************************
***************************************************************/
void 
asyncGenerator(ThreadData *data, 
	       int parallellism, 
	       int millisSendPoll,
	       int minEventSendPoll,
	       int forceSendPoll)
{
  ThreadData * startUp;
  
  GeneratorStatistics *st;
  double periodStop;
  double benchTimeStart;
  double benchTimeEnd;
  int i, j, done;

  myRandom48Init(data->randomSeed);
  
  for(i = 0; i<parallellism; i++){
    initGeneratorStatistics(&data[i].generator);
   }

  startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
  memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
  
  /*----------------*/
  /* warm up period */
  /*----------------*/
  periodStop = userGetTime() + (double)data[0].warmUpSeconds;
  
  while(userGetTime() < periodStop){
    doOneTransaction(startUp, parallellism, 
		     millisSendPoll, minEventSendPoll, forceSendPoll);
  }
  
  ndbout_c("Waiting for startup to finish");

  /**
   * Wait for all transactions
   */
  done = 0;
  while(!done){
    done = 1;
    for(i = 0; i<parallellism; i++){
      if(startUp[i].runState != Runnable){
	done = 0;
	break;
      }
    }
    if(!done){
      startUp[0].pNDB->sendPollNdb();
    }
  }
  ndbout_c("Benchmark period starts");

  /*-------------------------*/
  /* normal benchmark period */
  /*-------------------------*/
  benchTimeStart = userGetTime();
  
  periodStop = benchTimeStart + (double)data[0].testSeconds;
  while(userGetTime() < periodStop)
    doOneTransaction(data, parallellism,
		     millisSendPoll, minEventSendPoll, forceSendPoll);  

  benchTimeEnd = userGetTime();
  
  ndbout_c("Benchmark period done");

  /**
   * Wait for all transactions
   */
  done = 0;
  while(!done){
    done = 1;
    for(i = 0; i<parallellism; i++){
      if(data[i].runState != Runnable){
	done = 0;
	break;
      }
    }
    if(!done){
      data[0].pNDB->sendPollNdb();
    }
  }

  /*------------------*/
  /* cool down period */
   /*------------------*/
  periodStop = userGetTime() + (double)data[0].coolDownSeconds;
  while(userGetTime() < periodStop){
    doOneTransaction(startUp, parallellism,
		     millisSendPoll, minEventSendPoll, forceSendPoll);
  }

  done = 0;
  while(!done){
    done = 1;
    for(i = 0; i<parallellism; i++){
      if(startUp[i].runState != Runnable){
	done = 0;
	break;
      }
    }
    if(!done){
      startUp[0].pNDB->sendPollNdb();
    }
  }


  /*---------------------------------------------------------*/
  /* add the times for all transaction for inner loop timing */
  /*---------------------------------------------------------*/
  for(j = 0; j<parallellism; j++){
    st = &data[j].generator;
    
    st->outerLoopTime = benchTimeEnd - benchTimeStart;
    st->outerTps      = getTps(st->totalTransactions, st->outerLoopTime);
  }
  /* ndbout_c("maxsize = %d\n",maxsize); */

  free(startUp);
}