/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "Grep.hpp"
#include <ndb_version.h>

#include <NdbTCP.h>
#include <Bitmask.hpp>

#include <signaldata/NodeFailRep.hpp>
#include <signaldata/ReadNodesConf.hpp>
#include <signaldata/CheckNodeGroups.hpp>
#include <signaldata/GrepImpl.hpp>
#include <signaldata/RepImpl.hpp>
#include <signaldata/EventReport.hpp>
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/WaitGCP.hpp>
#include <GrepEvent.hpp>
#include <AttributeHeader.hpp>

#define CONTINUEB_DELAY 500
#define SSREPBLOCKNO 2  
#define PSREPBLOCKNO 2

//#define DEBUG_GREP
//#define DEBUG_GREP_SUBSCRIPTION
//#define DEBUG_GREP_TRANSFER
//#define DEBUG_GREP_APPLY
//#define DEBUG_GREP_DELETE

/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    STARTUP of GREP Block, etc
 * ------------------------------------------------------------------------
 **************************************************************************/
static Uint32 g_TypeOfStart = NodeState::ST_ILLEGAL_TYPE;
void
Grep::getNodeGroupMembers(Signal* signal) {
  jam();
  /**
   * Ask DIH for nodeGroupMembers
   */
  CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
  sd->blockRef = reference();
  sd->requestType =
    CheckNodeGroups::Direct |
    CheckNodeGroups::GetNodeGroupMembers;
  sd->nodeId = getOwnNodeId();
  EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal, 
		 CheckNodeGroups::SignalLength);
  jamEntry();
  
  c_nodeGroup = sd->output;
  c_noNodesInGroup = 0;
  for (int i = 0; i < MAX_NDB_NODES; i++) {
    if (sd->mask.get(i)) {
      if (i == getOwnNodeId()) c_idInNodeGroup = c_noNodesInGroup;
      c_nodesInGroup[c_noNodesInGroup] = i;
      c_noNodesInGroup++;
    }
  }
  ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup

#ifdef NODEFAIL_DEBUG
  for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
    ndbout_c ("Grep: NodeGroup %u, me %u, me in group %u, member[%u] %u",
	      c_nodeGroup, getOwnNodeId(), c_idInNodeGroup,
	      i, c_nodesInGroup[i]);
  }
#endif
}


void
Grep::execSTTOR(Signal* signal) 
{
  jamEntry();                            
  const Uint32 startphase  = signal->theData[1];
  const Uint32 typeOfStart = signal->theData[7];
  if (startphase == 3) 
  {
    jam();
    signal->theData[0] = reference();
    g_TypeOfStart = typeOfStart;
    sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
    return;
  }
  if(startphase == 5) {
    jam();
    /**
     * we don't want any log/meta records comming to use 
     * until we are done with the recovery.
     */
    if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
      jam();
      pspart.m_recoveryMode =  true;
      getNodeGroupMembers(signal);
      for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
	Uint32 ref =numberToRef(GREP, c_nodesInGroup[i]);
	if (ref != reference())
	  sendSignal(ref, GSN_GREP_START_ME, signal,
		     1 /*SumaStartMe::SignalLength*/, JBB);
      }
    } else  pspart.m_recoveryMode =  false;

  }
 
  if(startphase == 7) {
      jam();
    if (g_TypeOfStart == NodeState::ST_NODE_RESTART) {
      pspart.m_recoveryMode =  false;
    }
  }
  
  sendSTTORRY(signal);
}


void 
Grep::PSPart::execSTART_ME(Signal* signal)
{
  jamEntry();
  GrepStartMe *   me =(GrepStartMe*)signal->getDataPtr();
  BlockReference ref = me->senderRef;
  GrepAddSubReq* const addReq = (GrepAddSubReq *)signal->getDataPtr();  


  SubscriptionPtr subPtr;
  c_subscriptions.first(c_subPtr);
  for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) {
    jam();
    subPtr.i = c_subPtr.curr.i;
    subPtr.p = c_subscriptions.getPtr(subPtr.i);
    addReq->subscriptionId   = subPtr.p->m_subscriptionId;
    addReq->subscriptionKey  = subPtr.p->m_subscriptionKey;
    addReq->subscriberData   = subPtr.p->m_subscriberData;
    addReq->subscriptionType = subPtr.p->m_subscriptionType;
    addReq->senderRef        = subPtr.p->m_coordinatorRef;
    addReq->subscriberRef    =subPtr.p->m_subscriberRef;

    sendSignal(ref, 
	       GSN_GREP_ADD_SUB_REQ, 
	       signal, 
	       GrepAddSubReq::SignalLength,
	       JBB);
  }
  
  addReq->subscriptionId   = 0;
  addReq->subscriptionKey  = 0;
  addReq->subscriberData   = 0;
  addReq->subscriptionType = 0;
  addReq->senderRef        = 0;
  addReq->subscriberRef    = 0;

  sendSignal(ref, 
	     GSN_GREP_ADD_SUB_REQ, 
	     signal, 
	     GrepAddSubReq::SignalLength,
	     JBB);
}

void 
Grep::PSPart::execGREP_ADD_SUB_REQ(Signal* signal)
{
  jamEntry();
  GrepAddSubReq * const grepReq = (GrepAddSubReq *)signal->getDataPtr();
  const Uint32 subId          = grepReq->subscriptionId;
  const Uint32 subKey         = grepReq->subscriptionKey;
  const Uint32 subData        = grepReq->subscriberData;
  const Uint32 subType        = grepReq->subscriptionType;
  const Uint32 coordinatorRef = grepReq->senderRef;

  /**
   * this is ref to the REP node for this subscription.
   */
  const Uint32 subRef         = grepReq->subscriberRef;

  if(subId!=0 && subKey!=0) {
    jam();
    SubscriptionPtr subPtr;
    ndbrequire( c_subscriptionPool.seize(subPtr));
    subPtr.p->m_coordinatorRef    = coordinatorRef;
    subPtr.p->m_subscriptionId    = subId;
    subPtr.p->m_subscriptionKey   = subKey;
    subPtr.p->m_subscriberRef     = subRef;
    subPtr.p->m_subscriberData    = subData;
    subPtr.p->m_subscriptionType  = subType;
    
    c_subscriptions.add(subPtr);
  }
  else  {
    jam();
    GrepAddSubConf * conf = (GrepAddSubConf *)grepReq;
    conf->noOfSub = 
      c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree();
    sendSignal(signal->getSendersBlockRef(),
	       GSN_GREP_ADD_SUB_CONF, 
	       signal, 
	       GrepAddSubConf::SignalLength, 
	       JBB);
  }
}

void 
Grep::PSPart::execGREP_ADD_SUB_REF(Signal* signal)
{
  /**
   * @todo fix error stuff
   */
}

void 
Grep::PSPart::execGREP_ADD_SUB_CONF(Signal* signal)
{
  jamEntry();
  GrepAddSubConf* const conf = (GrepAddSubConf *)signal->getDataPtr();
  Uint32 noOfSubscriptions = conf->noOfSub;
  Uint32 noOfRestoredSubscriptions = 
    c_subscriptionPool.getSize()-c_subscriptionPool.getNoOfFree();
  if(noOfSubscriptions!=noOfRestoredSubscriptions) {
    jam();
    /**
     *@todo send ref signal
     */ 
    ndbrequire(false);
  }
}

void
Grep::execREAD_NODESCONF(Signal* signal) 
{
  jamEntry();
  ReadNodesConf * conf = (ReadNodesConf *)signal->getDataPtr();
  
#if 0
  ndbout_c("Grep: Recd READ_NODESCONF");
#endif
  
  /******************************
   * Check which REP nodes exist
   ******************************/
  Uint32 i;
  for (i = 1; i < MAX_NODES; i++) 
  {
    jam();
#if 0
    ndbout_c("Grep: Found node %d of type %d", i, getNodeInfo(i).getType());
#endif
    if (getNodeInfo(i).getType() == NodeInfo::REP)
    {
      jam();
      /**
       * @todo  This should work for more than ONE rep node!
       */
      pscoord.m_repRef = numberToRef(PSREPBLOCKNO, i);
      pspart.m_repRef = numberToRef(PSREPBLOCKNO, i);
#if 0
      ndbout_c("Grep: REP node %d detected", i);
#endif
    }
  }
  
  /*****************************
   * Check which DB nodes exist
   *****************************/
  m_aliveNodes.clear();

  Uint32 count = 0;
  for(i = 0; i<MAX_NDB_NODES; i++) 
  {
    if (NodeBitmask::get(conf->allNodes, i)) 
    {
      jam();
      count++;

      NodePtr node;
      ndbrequire(m_nodes.seize(node));
      
      node.p->nodeId = i;
      if (NodeBitmask::get(conf->inactiveNodes, i)) 
      {
	node.p->alive = 0;
      } 
      else 
      {
	node.p->alive = 1;
	m_aliveNodes.set(i);
      }
    }
  }
  m_masterNodeId = conf->masterNodeId;
  ndbrequire(count == conf->noOfNodes);
  sendSTTORRY(signal);
}

void
Grep::sendSTTORRY(Signal* signal) 
{
  signal->theData[0] = 0;
  signal->theData[3] = 1;
  signal->theData[4] = 3;
  signal->theData[5] = 5;
  signal->theData[6] = 7;
  signal->theData[7] = 255; // No more start phases from missra
  sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 8, JBB);
}

void
Grep::execNDB_STTOR(Signal* signal) 
{
  jamEntry();                            
}

void
Grep::execDUMP_STATE_ORD(Signal* signal) 
{
  jamEntry();
  //Uint32 tCase = signal->theData[0];

#if 0
  if(sscoord.m_repRef == 0) 
  {
    ndbout << "Grep: Recd DUMP signal but has no connection with REP node"
	   << endl;
    return;
  }
#endif 

  /*
  switch (tCase) 
  {
  case 8100: sscoord.grepReq(signal, GrepReq::START_SUBSCR); break;
  case 8102: sscoord.grepReq(signal, GrepReq::START_METALOG); break;
  case 8104: sscoord.grepReq(signal, GrepReq::START_METASCAN); break;
  case 8106: sscoord.grepReq(signal, GrepReq::START_DATALOG); break;
  case 8108: sscoord.grepReq(signal, GrepReq::START_DATASCAN); break;
  case 8110: sscoord.grepReq(signal, GrepReq::STOP_SUBSCR); break;
  case 8500: sscoord.grepReq(signal, GrepReq::REMOVE_BUFFERS); break;
  case 8300: sscoord.grepReq(signal, GrepReq::SLOWSTOP); break;
  case 8400: sscoord.grepReq(signal, GrepReq::FASTSTOP); break;
  case 8600: sscoord.grepReq(signal, GrepReq::CREATE_SUBSCR); break;
  case 8700: sscoord.dropTable(signal,(Uint32)signal->theData[1]);break;
  default: break;
  }
  */
}

/**
 *  Signal received when REP node has failed
 */
void 
Grep::execAPI_FAILREQ(Signal* signal) 
{
  jamEntry();
  //Uint32          failedApiNode = signal->theData[0];
  //BlockReference  retRef = signal->theData[1];
  
  /**
   * @todo We should probably do something smart if the 
   *       PS REP node fails???? /Lars
   */

#if 0
  ndbout_c("Grep: API_FAILREQ received for API node %d.", failedApiNode);
#endif
  
  /**
   * @note  This signal received is NOT allowed to send any CONF
   *        signal, since this would screw up TC/DICT to API 
   *        "connections".
   */
}

/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    GREP Control
 * ------------------------------------------------------------------------
 **************************************************************************/
void
Grep::execGREP_REQ(Signal* signal) 
{
  jamEntry();
  
  //GrepReq * req = (GrepReq *)signal->getDataPtr();
  
  /**
   * @todo Fix so that request is redirected to REP Server
   *  Obsolete?
   * Was:   sscoord.grepReq(signal, req->request);
   */
  ndbout_c("Warning! REP commands can only be executed at REP SERVER prompt!");
}


/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    NODE STATE HANDLING
 * ------------------------------------------------------------------------
 **************************************************************************/
void
Grep::execNODE_FAILREP(Signal* signal) 
{
  jamEntry();
  NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
  bool changed = false;

  NodePtr nodePtr;
  for(m_nodes.first(nodePtr); nodePtr.i != RNIL; m_nodes.next(nodePtr)) 
  {
    jam();
    if (NodeBitmask::get(rep->theNodes, nodePtr.p->nodeId)) 
    {
      jam();
      
      if (nodePtr.p->alive) 
      {
	jam();
	ndbassert(m_aliveNodes.get(nodePtr.p->nodeId));
	changed = true;
      } 
      else 
      {
	ndbassert(!m_aliveNodes.get(nodePtr.p->nodeId));
      }
      
      nodePtr.p->alive = 0;
      m_aliveNodes.clear(nodePtr.p->nodeId);
    }
  }


  /**
   * Problem: Fix a node failure running a protocol
   * 
   * 1. Coordinator node of a protocol dies
   *      - Elect a new coordinator
   *      - send ref to user
   *      
   * 2. Non-coordinator dies.
   *      - make coordinator aware of this
   *        so that coordinator does not wait for 
   *        conf from faulty node
   *      - node recovery will restore the non-coordinator.
   *        
   */
}

void
Grep::execINCL_NODEREQ(Signal* signal) 
{
  jamEntry();
  
  //const Uint32 senderRef = signal->theData[0];
  const Uint32 inclNode  = signal->theData[1];

  NodePtr node;
  for(m_nodes.first(node); node.i != RNIL; m_nodes.next(node)) 
  {
    jam();
    const Uint32 nodeId = node.p->nodeId;
    if (inclNode == nodeId) {
      jam();
      
      ndbrequire(node.p->alive == 0);
      ndbassert(!m_aliveNodes.get(nodeId));
      
      node.p->alive = 1;
      m_aliveNodes.set(nodeId);
      
      break;
    }
  }

  /**
   * @todo:  if we include this DIH's got to be prepared, later if needed...
   */
#if 0 
  signal->theData[0] = reference();
  
  sendSignal(senderRef, GSN_INCL_NODECONF, signal, 1, JBB);
#endif  
}


/**
 * Helper methods 
 */
void 
Grep::PSCoord::prepareOperationRec(SubCoordinatorPtr subPtr, 
				   BlockReference subscriber,
				   Uint32 subId,
				   Uint32 subKey,
				   Uint32 request) 
{
  subPtr.p->m_coordinatorRef     = reference();
  subPtr.p->m_subscriberRef      = subscriber;
  subPtr.p->m_subscriberData     = subPtr.i;
  subPtr.p->m_subscriptionId     = subId;
  subPtr.p->m_subscriptionKey    = subKey;
  subPtr.p->m_outstandingRequest = request;
}


/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    CREATE SUBSCRIPTION ID
 * ------------------------------------------------------------------------
 * 
 *  Requests SUMA to create a unique subscription id 
 **************************************************************************/

void 
Grep::PSCoord::execGREP_CREATE_SUBID_REQ(Signal* signal) 
{
  jamEntry();

  CreateSubscriptionIdReq * req = 
    (CreateSubscriptionIdReq*)signal->getDataPtr();
  BlockReference ref = signal->getSendersBlockRef();
  
  SubCoordinatorPtr subPtr;
  if( !c_subCoordinatorPool.seize(subPtr)) {
    jam();
    SubCoordinator sub;
    sub.m_subscriberRef   = ref;
    sub.m_subscriptionId  = 0;
    sub.m_subscriptionKey = 0;
    sendRefToSS(signal, sub, GrepError::SUBSCRIPTION_ID_NOMEM );
    return;
  }
  prepareOperationRec(subPtr,
		      ref,
		      0,0,
		      GSN_CREATE_SUBID_REQ);

  
  ndbout_c("SUBID_REQ  Ref %d",ref);
  req->senderData=subPtr.p->m_subscriberData;

  sendSignal(SUMA_REF, GSN_CREATE_SUBID_REQ, signal, 
	     SubCreateReq::SignalLength, JBB);    

#if 1 //def DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSCoord: Sent CREATE_SUBID_REQ to SUMA");
#endif
}

void 
Grep::PSCoord::execCREATE_SUBID_CONF(Signal* signal) 
{
  jamEntry();
  CreateSubscriptionIdConf const * conf = 
    (CreateSubscriptionIdConf *)signal->getDataPtr();
  Uint32 subId    = conf->subscriptionId;
  Uint32 subKey   = conf->subscriptionKey;
  Uint32 subData  = conf->subscriberData;

#if 1 //def DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSCoord: Recd GREP_SUBID_CONF (subId:%d, subKey:%d)", 
	   subId, subKey);
#endif

  SubCoordinatorPtr subPtr;
  c_subCoordinatorPool.getPtr(subPtr, subData);
  BlockReference repRef = subPtr.p->m_subscriberRef;
  
  { // Check that id/key is unique
    SubCoordinator key;
    SubCoordinatorPtr tmp;
    key.m_subscriptionId  = subId;
    key.m_subscriptionKey = subKey;
    if(c_runningSubscriptions.find(tmp, key)){
      jam();
      SubCoordinator sub;
      sub.m_subscriberRef=repRef;
      sub.m_subscriptionId = subId;
      sub.m_subscriptionKey = subKey;
      sendRefToSS(signal,sub, GrepError::SUBSCRIPTION_ID_NOT_UNIQUE );
      return;
    }
  }
  
  sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_CREATE_SUBID_CONF, signal, 
	     CreateSubscriptionIdConf::SignalLength, JBB);
  c_subCoordinatorPool.release(subData);
  
  m_grep->sendEventRep(signal,
			 EventReport::GrepSubscriptionInfo, 
			 GrepEvent::GrepPS_CreateSubIdConf,
			 subId,
			 subKey,
			 (Uint32)GrepError::NO_ERROR);   
}

void 
Grep::PSCoord::execCREATE_SUBID_REF(Signal* signal) {
  jamEntry();
  CreateSubscriptionIdRef const * ref = 
    (CreateSubscriptionIdRef *)signal->getDataPtr();
  Uint32 subData = ref->subscriberData;
  GrepError::Code err;
  
  Uint32 sendersBlockRef = signal->getSendersBlockRef();
  if(sendersBlockRef == SUMA_REF) 
  {
    jam();
    err = GrepError::SUBSCRIPTION_ID_SUMA_FAILED_CREATE;
  } else {
    jam();
    ndbrequire(false); /* Added since errorcode err unhandled
			* TODO: fix correct errorcode
			*/
    err= GrepError::NO_ERROR; // remove compiler warning
  }

  SubCoordinatorPtr subPtr;
  c_runningSubscriptions.getPtr(subPtr, subData);
  BlockReference repref = subPtr.p->m_subscriberRef;
  
  SubCoordinator sub;
  sub.m_subscriberRef   = repref;
  sub.m_subscriptionId  = 0;
  sub.m_subscriptionKey = 0;
  sendRefToSS(signal,sub, err);

}


/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    CREATE SUBSCRIPTION
 * ------------------------------------------------------------------------
 * 
 *  Creates a subscription for every GREP to its local SUMA.
 *  GREP node that executes createSubscription becomes the GREP Coord.
 **************************************************************************/

/**
 * Request to create a subscription (sent from SS)
 */
void
Grep::PSCoord::execGREP_SUB_CREATE_REQ(Signal* signal) 
{
  jamEntry();
  GrepSubCreateReq const * grepReq = (GrepSubCreateReq *)signal->getDataPtr();
  Uint32 subId           = grepReq->subscriptionId;
  Uint32 subKey          = grepReq->subscriptionKey;
  Uint32 subType         = grepReq->subscriptionType;
  BlockReference rep     = signal->getSendersBlockRef();

  GrepCreateReq * req    =(GrepCreateReq*)grepReq;

  SubCoordinatorPtr subPtr;

  if( !c_subCoordinatorPool.seize(subPtr)) {
    jam();
    SubCoordinator sub;
    sub.m_subscriberRef = rep;
    sub.m_subscriptionId = 0;
    sub.m_subscriptionKey = 0;
    sub.m_outstandingRequest = GSN_GREP_CREATE_REQ;
    sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
    return;
  }
  prepareOperationRec(subPtr,
		      numberToRef(PSREPBLOCKNO, refToNode(rep)), subId, subKey,
		      GSN_GREP_CREATE_REQ);

  /* Get the payload of the signal.
   */
  SegmentedSectionPtr selectedTablesPtr;
  if(subType == SubCreateReq::SelectiveTableSnapshot) {
    jam();
    ndbrequire(signal->getNoOfSections()==1);    
    signal->getSection(selectedTablesPtr,0);
    signal->header.m_noOfSections = 0;
  }
  /**
   * Prepare the signal to be sent to Grep participatns
   */
  subPtr.p->m_subscriptionType = subType;
  req->senderRef        = reference();
  req->subscriberRef    = numberToRef(PSREPBLOCKNO, refToNode(rep));
  req->subscriberData   = subPtr.p->m_subscriberData;
  req->subscriptionId   = subId; 
  req->subscriptionKey  = subKey; 
  req->subscriptionType = subType;

  /*add payload if it is a selectivetablesnap*/
  if(subType == SubCreateReq::SelectiveTableSnapshot) {
    jam();
    signal->setSection(selectedTablesPtr, 0);
  }

  /******************************
   * Send to all PS participants
   ******************************/
  NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
  subPtr.p->m_outstandingParticipants = rg;
  sendSignal(rg,
	     GSN_GREP_CREATE_REQ, signal, 
	     GrepCreateReq::SignalLength, JBB);


#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSCoord: Sent GREP_CREATE_REQ "
	   "(subId:%d, subKey:%d, subData:%d, subType:%d) to parts",
	   subId, subKey, subPtr.p->m_subscriberData, subType);
#endif
}

void 
Grep::PSPart::execGREP_CREATE_REQ(Signal* signal) 
{
  jamEntry();
  GrepCreateReq * const grepReq = (GrepCreateReq *)signal->getDataPtr();
  const Uint32 subId          = grepReq->subscriptionId;
  const Uint32 subKey         = grepReq->subscriptionKey;
  const Uint32 subData        = grepReq->subscriberData;
  const Uint32 subType        = grepReq->subscriptionType;
  const Uint32 coordinatorRef = grepReq->senderRef;
  const Uint32 subRef         = grepReq->subscriberRef; //this is ref to the
                                                        //REP node for this 
                                                        //subscription.

  SubscriptionPtr subPtr;
  ndbrequire( c_subscriptionPool.seize(subPtr));
  subPtr.p->m_coordinatorRef     = coordinatorRef;
  subPtr.p->m_subscriptionId     = subId;
  subPtr.p->m_subscriptionKey    = subKey;
  subPtr.p->m_subscriberRef      = subRef;
  subPtr.p->m_subscriberData     = subPtr.i;
  subPtr.p->m_subscriptionType   = subType;
  subPtr.p->m_outstandingRequest = GSN_GREP_CREATE_REQ; 
  subPtr.p->m_operationPtrI      = subData;

  c_subscriptions.add(subPtr);

  SegmentedSectionPtr selectedTablesPtr;
  if(subType == SubCreateReq::SelectiveTableSnapshot) {
    jam();
    ndbrequire(signal->getNoOfSections()==1);
    signal->getSection(selectedTablesPtr,0);// SubCreateReq::TABLE_LIST);
    signal->header.m_noOfSections = 0;
  }

  /**
   * Prepare signal to be sent to SUMA
   */
  SubCreateReq * sumaReq = (SubCreateReq *)grepReq;
  sumaReq->subscriberRef    = GREP_REF;
  sumaReq->subscriberData   = subPtr.p->m_subscriberData;
  sumaReq->subscriptionId   = subPtr.p->m_subscriptionId; 
  sumaReq->subscriptionKey  = subPtr.p->m_subscriptionKey;
  sumaReq->subscriptionType = subPtr.p->m_subscriptionType;
  /*add payload if it is a selectivetablesnap*/
  if(subType == SubCreateReq::SelectiveTableSnapshot) {
    jam();
    signal->setSection(selectedTablesPtr, 0);
  }  
  sendSignal(SUMA_REF, 
	     GSN_SUB_CREATE_REQ, 
	     signal, 
	     SubCreateReq::SignalLength, 
	     JBB);
}

void
Grep::PSPart::execSUB_CREATE_CONF(Signal* signal) 
{
  jamEntry();  

  SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
  Uint32 subData             = conf->subscriberData;

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  /**
     @todo check why this can fuck up -johan
     
     ndbrequire(subPtr.p->m_subscriptionId  == conf->subscriptionId);
     ndbrequire(subPtr.p->m_subscriptionKey == conf->subscriptionKey);
  */
#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSPart: Recd SUB_CREATE_CONF "
	   "(subId:%d, subKey:%d) from SUMA",
	   conf->subscriptionId, conf->subscriptionKey);
#endif

  /*********************
   * Send conf to coord
   *********************/
  GrepCreateConf * grepConf = (GrepCreateConf*)conf;
  grepConf->senderNodeId = getOwnNodeId();
  grepConf->senderData = subPtr.p->m_operationPtrI;
  sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_CREATE_CONF, signal, 
	     GrepCreateConf::SignalLength, JBB);    
  subPtr.p->m_outstandingRequest = 0; 
}

/**
 * Handle errors that either occured in:
 * 1) PSPart
 * or
 * 2) propagated from local SUMA
 */
void 
Grep::PSPart::execSUB_CREATE_REF(Signal* signal) 
{
  jamEntry();
  SubCreateRef * const ref = (SubCreateRef *)signal->getDataPtr();
  Uint32 subData           = ref->subscriberData;
  GrepError::Code err      = (GrepError::Code)ref->err;
  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  sendRefToPSCoord(signal, *subPtr.p, err /*error*/);
  subPtr.p->m_outstandingRequest = 0;
}

void 
Grep::PSCoord::execGREP_CREATE_CONF(Signal* signal) 
{
  jamEntry();
  GrepCreateConf const * conf = (GrepCreateConf *)signal->getDataPtr();
  Uint32 subData       = conf->senderData;
  Uint32 nodeId        = conf->senderNodeId;

  SubCoordinatorPtr subPtr;
  c_subCoordinatorPool.getPtr(subPtr, subData);
  
  ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_CREATE_REQ);
  
  subPtr.p->m_outstandingParticipants.clearWaitingFor(nodeId);
  
  if(!subPtr.p->m_outstandingParticipants.done()) return;
  /********************************
   * All participants have CONF:ed
   ********************************/
  Uint32 subId = subPtr.p->m_subscriptionId;
  Uint32 subKey = subPtr.p->m_subscriptionKey;
    
  GrepSubCreateConf * grepConf = (GrepSubCreateConf *)signal->getDataPtr();
  grepConf->subscriptionId  = subId;
  grepConf->subscriptionKey = subKey;
  sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_CREATE_CONF, signal, 
	     GrepSubCreateConf::SignalLength, JBB);

  /**
   * Send event report
   */
  m_grep->sendEventRep(signal,
		       EventReport::GrepSubscriptionInfo,
		       GrepEvent::GrepPS_SubCreateConf,
		       subId,
		       subKey,
		       (Uint32)GrepError::NO_ERROR);

  c_subCoordinatorPool.release(subPtr);

}

/**
 * Handle errors that either occured in:
 * 1) PSCoord
 * or
 * 2) propagated from PSPart 
 */
void 
Grep::PSCoord::execGREP_CREATE_REF(Signal* signal) 
{
  jamEntry();
  GrepCreateRef * const ref = (GrepCreateRef *)signal->getDataPtr();
  Uint32 subData = ref->senderData;
  Uint32 err     = ref->err;
  SubCoordinatorPtr subPtr;
  c_runningSubscriptions.getPtr(subPtr, subData);  
 
  sendRefToSS(signal, *subPtr.p, (GrepError::Code)err /*error*/);
}


/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    START SUBSCRIPTION
 * ------------------------------------------------------------------------
 * 
 *  Starts a subscription at SUMA.  
 *  Each participant starts its own subscription.
 **************************************************************************/

/**
 * Request to start subscription (Sent from SS)
 */
void
Grep::PSCoord::execGREP_SUB_START_REQ(Signal* signal) 
{
  jamEntry();
  GrepSubStartReq * const subReq = (GrepSubStartReq *)signal->getDataPtr();
  SubscriptionData::Part part    = (SubscriptionData::Part) subReq->part;
  Uint32 subId                   = subReq->subscriptionId;
  Uint32 subKey                  = subReq->subscriptionKey;
  BlockReference rep             = signal->getSendersBlockRef();

  SubCoordinatorPtr subPtr;

  if(!c_subCoordinatorPool.seize(subPtr)) {
    jam();
    SubCoordinator sub;
    sub.m_subscriberRef = rep;
    sub.m_subscriptionId = 0;
    sub.m_subscriptionKey = 0;
    sub.m_outstandingRequest = GSN_GREP_START_REQ;
    sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
    return;
  }
  
  prepareOperationRec(subPtr,
		      numberToRef(PSREPBLOCKNO, refToNode(rep)), 
		      subId, subKey,
		      GSN_GREP_START_REQ);
 
  GrepStartReq * const req    = (GrepStartReq *) subReq;
  req->part                   = (Uint32) part;
  req->subscriptionId         = subPtr.p->m_subscriptionId;
  req->subscriptionKey        = subPtr.p->m_subscriptionKey;
  req->senderData             = subPtr.p->m_subscriberData;

  /***************************
   * Send to all participants
   ***************************/
  NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
  subPtr.p->m_outstandingParticipants = rg;
  sendSignal(rg,
	     GSN_GREP_START_REQ, 
	     signal, 
	     GrepStartReq::SignalLength, JBB);

#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSCoord: Sent GREP_START_REQ "
	   "(subId:%d, subKey:%d, senderData:%d, part:%d) to all participants",
	   req->subscriptionId, req->subscriptionKey, req->senderData, part);
#endif
}


void 
Grep::PSPart::execGREP_START_REQ(Signal* signal) 
{
  jamEntry();
  GrepStartReq * const grepReq = (GrepStartReq *) signal->getDataPtr();    
  SubscriptionData::Part part  = (SubscriptionData::Part)grepReq->part;
  Uint32 subId                 = grepReq->subscriptionId;
  Uint32 subKey                = grepReq->subscriptionKey;
  Uint32 operationPtrI         = grepReq->senderData;
  
  Subscription key;
  key.m_subscriptionId        = subId;
  key.m_subscriptionKey       = subKey;
  SubscriptionPtr subPtr;
  ndbrequire(c_subscriptions.find(subPtr, key));;
  subPtr.p->m_outstandingRequest = GSN_GREP_START_REQ; 
  subPtr.p->m_operationPtrI = operationPtrI;
  /**
   * send SUB_START_REQ to local SUMA
   */
  SubStartReq * sumaReq    = (SubStartReq *) grepReq;
  sumaReq->subscriptionId  = subId; 
  sumaReq->subscriptionKey = subKey;
  sumaReq->subscriberData  = subPtr.i;
  sumaReq->part            = (Uint32) part;

  sendSignal(SUMA_REF, GSN_SUB_START_REQ, signal, 
	     SubStartReq::SignalLength, JBB);  
#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSPart: Sent SUB_START_REQ (subId:%d, subKey:%d, part:%d)", 
	   subId, subKey, (Uint32)part);
#endif
}


void
Grep::PSPart::execSUB_START_CONF(Signal* signal) 
{
  jamEntry();
  
  SubStartConf * const conf = (SubStartConf *) signal->getDataPtr();
  SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
  Uint32 subId                = conf->subscriptionId;
  Uint32 subKey               = conf->subscriptionKey;
  Uint32 subData              = conf->subscriberData;
  Uint32 firstGCI             = conf->firstGCI;
#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSPart: Recd SUB_START_CONF "
	   "(subId:%d, subKey:%d, subData:%d)",
	   subId, subKey, subData);
#endif

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  ndbrequire(subPtr.p->m_subscriptionId  == subId);
  ndbrequire(subPtr.p->m_subscriptionKey == subKey);

  GrepStartConf * grepConf = (GrepStartConf *)conf;
  grepConf->senderData      = subPtr.p->m_operationPtrI;
  grepConf->part            = (Uint32) part;
  grepConf->subscriptionKey = subKey;
  grepConf->subscriptionId  = subId;
  grepConf->firstGCI        = firstGCI;
  grepConf->senderNodeId    = getOwnNodeId();
  sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_START_CONF, signal, 
	     GrepStartConf::SignalLength, JBB);
  subPtr.p->m_outstandingRequest = 0; 

#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSPart: Sent GREP_START_CONF "
	   "(subId:%d, subKey:%d, subData:%d, part:%d)",
	   subId, subKey, subData, part);
#endif
}


/**
 * Handle errors that either occured in:
 * 1) PSPart
 * or
 * 2) propagated from local SUMA
 *  
 * Propagates REF signal to PSCoord
 */
void 
Grep::PSPart::execSUB_START_REF(Signal* signal) 
{
  SubStartRef * const ref = (SubStartRef *)signal->getDataPtr();
  Uint32 subData          = ref->subscriberData;
  GrepError::Code err     = (GrepError::Code)ref->err;
  SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  sendRefToPSCoord(signal, *subPtr.p, err /*error*/, part);
  subPtr.p->m_outstandingRequest = 0;
}


/**
 *  Logging has started... (says PS Participant)
 */
void 
Grep::PSCoord::execGREP_START_CONF(Signal* signal) 
{
  jamEntry();  

  GrepStartConf * const conf = (GrepStartConf *) signal->getDataPtr();
  Uint32 subData              = conf->senderData;
  SubscriptionData::Part part = (SubscriptionData::Part)conf->part;
  Uint32 subId                = conf->subscriptionId;
  Uint32 subKey               = conf->subscriptionKey;
  Uint32 firstGCI             = conf->firstGCI;

  SubCoordinatorPtr subPtr;
  c_subCoordinatorPool.getPtr(subPtr, subData);
  ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_START_REQ);

  subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId);

  if(!subPtr.p->m_outstandingParticipants.done()) return;
  jam();
  
  /*************************
   * All participants ready 
   *************************/
  GrepSubStartConf * grepConf = (GrepSubStartConf *) conf;
  grepConf->part              = part;
  grepConf->subscriptionId    = subId;
  grepConf->subscriptionKey   = subKey;
  grepConf->firstGCI          = firstGCI;

  bool ok = false;
  switch(part) {
  case SubscriptionData::MetaData:
    ok = true;
    sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, 
	       GrepSubStartConf::SignalLength, JBB);
    
    /**
     * Send event report
     */
    m_grep->sendEventRep(signal,
			 EventReport::GrepSubscriptionInfo,
			 GrepEvent::GrepPS_SubStartMetaConf,
			 subId, subKey,
			 (Uint32)GrepError::NO_ERROR);
    
    c_subCoordinatorPool.release(subPtr);
    break;
  case SubscriptionData::TableData:
    ok = true;
    sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_START_CONF, signal, 
	       GrepSubStartConf::SignalLength, JBB);

    /**
     * Send event report
     */
    m_grep->sendEventRep(signal,
			 EventReport::GrepSubscriptionInfo,
			 GrepEvent::GrepPS_SubStartDataConf,
			 subId, subKey,
			 (Uint32)GrepError::NO_ERROR);
    

    c_subCoordinatorPool.release(subPtr);
    break;
  }
  ndbrequire(ok);

#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSCoord: Recd SUB_START_CONF (subId:%d, subKey:%d, part:%d) "
	   "from all slaves",
	   subId, subKey, (Uint32)part); 
#endif
}

/**
 * Handle errors that either occured in:
 * 1) PSCoord
 * or
 * 2) propagated from PSPart 
 */
void 
Grep::PSCoord::execGREP_START_REF(Signal* signal) 
{
  jamEntry();
  GrepStartRef * const ref = (GrepStartRef *)signal->getDataPtr();
  Uint32 subData           = ref->senderData;
  GrepError::Code err      = (GrepError::Code)ref->err;
  SubscriptionData::Part part  = (SubscriptionData::Part)ref->part;

  SubCoordinatorPtr subPtr;
  c_runningSubscriptions.getPtr(subPtr, subData);  
  sendRefToSS(signal, *subPtr.p, err /*error*/, part);
}
 
/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    REMOVE SUBSCRIPTION
 * ------------------------------------------------------------------------
 * 
 *  Remove a subscription at SUMA.  
 *  Each participant removes its own subscription.
 *  We start by deleting the subscription inside the requestor
 *  since, we don't know if nodes (REP nodes or DB nodes) 
 *  have disconnected after we sent out this and 
 *  if we dont delete the sub in the requestor now, 
 *  we won't be able to create a new subscription
 **************************************************************************/

/**
 * Request to abort subscription (Sent from SS)
 */
void
Grep::PSCoord::execGREP_SUB_REMOVE_REQ(Signal* signal) 
{
  jamEntry();
  GrepSubRemoveReq * const subReq = (GrepSubRemoveReq *)signal->getDataPtr();
  Uint32 subId           = subReq->subscriptionId;
  Uint32 subKey          = subReq->subscriptionKey;
  BlockReference rep     = signal->getSendersBlockRef(); 

  SubCoordinatorPtr subPtr;
  if( !c_subCoordinatorPool.seize(subPtr)) {
    jam();
    SubCoordinator sub;
    sub.m_subscriberRef = rep;
    sub.m_subscriptionId = 0;
    sub.m_subscriptionKey = 0;
    sub.m_outstandingRequest = GSN_GREP_REMOVE_REQ;
    sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
    return;
  }


  prepareOperationRec(subPtr,
		      numberToRef(PSREPBLOCKNO, refToNode(rep)), 
		      subId, subKey,
		      GSN_GREP_REMOVE_REQ);

  c_runningSubscriptions.add(subPtr);

  GrepRemoveReq * req         = (GrepRemoveReq *) subReq;
  req->subscriptionId         = subPtr.p->m_subscriptionId;
  req->subscriptionKey        = subPtr.p->m_subscriptionKey;
  req->senderData             = subPtr.p->m_subscriberData;
  req->senderRef              = subPtr.p->m_coordinatorRef;

  /***************************
   * Send to all participants
   ***************************/
  NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
  subPtr.p->m_outstandingParticipants = rg;
  sendSignal(rg,
	     GSN_GREP_REMOVE_REQ, signal, 
	     GrepRemoveReq::SignalLength, JBB);
}


void 
Grep::PSPart::execGREP_REMOVE_REQ(Signal* signal)
{
  jamEntry();
  GrepRemoveReq * const grepReq = (GrepRemoveReq *) signal->getDataPtr();    
  Uint32 subId        = grepReq->subscriptionId;
  Uint32 subKey       = grepReq->subscriptionKey;
  Uint32 subData      = grepReq->senderData;
  Uint32 coordinator  = grepReq->senderRef;

  Subscription key;
  key.m_subscriptionId        = subId;
  key.m_subscriptionKey       = subKey;
  SubscriptionPtr subPtr;
  
  if(!c_subscriptions.find(subPtr, key))
    {
      /**
       * The subscription was not found, so it must be deleted.
       * Send CONF back, since it does not exist (thus, it is removed)
       */
      GrepRemoveConf * grepConf = (GrepRemoveConf *)grepReq;
      grepConf->subscriptionKey = subKey;
      grepConf->subscriptionId  = subId;
      grepConf->senderData      = subData;
      grepConf->senderNodeId    = getOwnNodeId();
      sendSignal(coordinator, GSN_GREP_REMOVE_CONF, signal, 
		 GrepRemoveConf::SignalLength, JBB);
      return;      
    }

  subPtr.p->m_operationPtrI = subData;
  subPtr.p->m_coordinatorRef = coordinator;
  subPtr.p->m_outstandingRequest = GSN_GREP_REMOVE_REQ; 

  /**
   * send SUB_REMOVE_REQ to local SUMA
   */
  SubRemoveReq * sumaReq   = (SubRemoveReq *) grepReq;
  sumaReq->subscriptionId  = subId; 
  sumaReq->subscriptionKey = subKey;
  sumaReq->senderData  = subPtr.i;
  sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal, 
	     SubStartReq::SignalLength, JBB);  
}


/**
 * SUB_REMOVE_CONF (from local SUMA)
 */
void
Grep::PSPart::execSUB_REMOVE_CONF(Signal* signal) 
{
  jamEntry();
  SubRemoveConf * const conf = (SubRemoveConf *) signal->getDataPtr();
  Uint32 subId     = conf->subscriptionId;
  Uint32 subKey    = conf->subscriptionKey;
  Uint32 subData   = conf->subscriberData;

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  ndbrequire(subPtr.p->m_subscriptionId  == subId);
  ndbrequire(subPtr.p->m_subscriptionKey == subKey);
  subPtr.p->m_outstandingRequest = 0; 
  GrepRemoveConf * grepConf = (GrepRemoveConf *)conf;
  grepConf->subscriptionKey = subKey;
  grepConf->subscriptionId  = subId;
  grepConf->senderData      = subPtr.p->m_operationPtrI;
  grepConf->senderNodeId    = getOwnNodeId();
  sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_REMOVE_CONF, signal, 
	     GrepRemoveConf::SignalLength, JBB);
  c_subscriptions.release(subPtr);  

}


/**
 * SUB_REMOVE_CONF (from local SUMA)
 */
void
Grep::PSPart::execSUB_REMOVE_REF(Signal* signal) 
{
  jamEntry();
  SubRemoveRef * const ref = (SubRemoveRef *)signal->getDataPtr();
  Uint32 subData           = ref->subscriberData;
  /*  GrepError::Code err      = (GrepError::Code)ref->err;*/
  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  
  //sendSubRemoveRef_PSCoord(signal, *subPtr.p, err /*error*/);
}


/**
 *  Aborting has been carried out (says Participants)
 */
void 
Grep::PSCoord::execGREP_REMOVE_CONF(Signal* signal) 
{
  jamEntry();
  GrepRemoveConf * const conf = (GrepRemoveConf *) signal->getDataPtr();
  Uint32 subId                = conf->subscriptionId;
  Uint32 subKey               = conf->subscriptionKey;
  Uint32 senderNodeId         = conf->senderNodeId;
  Uint32 subData              = conf->senderData;
  SubCoordinatorPtr subPtr;
  c_subCoordinatorPool.getPtr(subPtr, subData);

  ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_REMOVE_REQ);
  
  subPtr.p->m_outstandingParticipants.clearWaitingFor(senderNodeId);

  if(!subPtr.p->m_outstandingParticipants.done()) { 
    jam();
    return;
  }
  jam();
  
  /*************************
   * All participants ready 
   *************************/

  m_grep->sendEventRep(signal,
		       EventReport::GrepSubscriptionInfo,
		       GrepEvent::GrepPS_SubRemoveConf,
		       subId, subKey,
		       GrepError::NO_ERROR);

  GrepSubRemoveConf * grepConf = (GrepSubRemoveConf *) conf;
  grepConf->subscriptionId = subId;
  grepConf->subscriptionKey = subKey;
  sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_REMOVE_CONF, signal, 
	     GrepSubRemoveConf::SignalLength, JBB);
  
  c_subCoordinatorPool.release(subPtr);
}



void 
Grep::PSCoord::execGREP_REMOVE_REF(Signal* signal) 
{
  jamEntry();
  GrepRemoveRef * const ref = (GrepRemoveRef *)signal->getDataPtr();
  Uint32 subData = ref->senderData;
  Uint32 err     = ref->err;
  SubCoordinatorPtr subPtr;

  /**
   * Get the operationrecord matching subdata and remove it. Subsequent 
   * execGREP_REMOVE_REF will simply be ignored at this stage.
   */
  for( c_runningSubscriptions.first(c_subPtr); 
       !c_subPtr.isNull(); c_runningSubscriptions.next(c_subPtr)) {
    jam();
    subPtr.i = c_subPtr.curr.i;
    subPtr.p = c_runningSubscriptions.getPtr(subPtr.i);
    if(subData == subPtr.i) 
      {
      sendRefToSS(signal, *subPtr.p, (GrepError::Code)err /*error*/);
      c_runningSubscriptions.release(subPtr);
    return;
    }
  }
  return;
}


/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:       LOG RECORDS (COMING IN FROM LOCAL SUMA)
 * ------------------------------------------------------------------------
 * 
 *  After the subscription is started, we get log records from SUMA.
 *  Both table data and meta data log records are received.
 *
 *  TODO:
 *  @todo Changes in meta data is currently not 
 *	  allowed during global replication
 **************************************************************************/

void
Grep::PSPart::execSUB_META_DATA(Signal* signal) 
{
  jamEntry();
  if(m_recoveryMode) {
    jam();
    return;
  }
  /**
   * METASCAN and METALOG
   */
  SubMetaData * data = (SubMetaData *) signal->getDataPtrSend();
  SubscriptionPtr subPtr;  
  c_subscriptions.getPtr(subPtr, data->subscriberData);

  /***************************
   * Forward data to REP node
   ***************************/
  sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_META_DATA, signal, 
	     SubMetaData::SignalLength, JBB);
#ifdef DEBUG_GREP_SUBSCRIPTION
  ndbout_c("Grep::PSPart: Sent SUB_META_DATA to REP "
	   "(TableId: %d, SenderData: %d, GCI: %d)",
	   data->tableId, data->senderData, data->gci);
#endif
}

/**
 * Receive table data from SUMA and dispatches it to REP node.
 */
void
Grep::PSPart::execSUB_TABLE_DATA(Signal* signal) 
{
  jamEntry();
  if(m_recoveryMode) {
    jam();
    return;
  }
  ndbrequire(m_repRef!=0);
  
  if(!assembleFragments(signal)) { jam(); return; }
  
  /**
   * Check if it is SCAN or LOG data that has arrived
   */
  if(signal->getNoOfSections() == 2)
  {
    jam();
    /**
     * DATASCAN - Not marked with GCI, so mark with latest seen GCI 
     */
    if(m_firstScanGCI == 1 && m_lastScanGCI == 0) {
      m_firstScanGCI = m_latestSeenGCI;
      m_lastScanGCI = m_latestSeenGCI;
    }
    SubTableData * data = (SubTableData*)signal->getDataPtrSend();
    Uint32 subData      = data->senderData;
    data->gci           = m_latestSeenGCI;  
    data->logType       = SubTableData::SCAN;
    
    SubscriptionPtr subPtr;
    c_subscriptions.getPtr(subPtr, subData);
    sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA, signal, 
	       SubTableData::SignalLength, JBB);
#ifdef DEBUG_GREP
    ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Scan, GCI: %d)", 
	     data->gci);
#endif
  } 
  else 
  {
    jam();
    /**
     * DATALOG (TRIGGER) - Already marked with GCI
     */
    SubTableData * data = (SubTableData*)signal->getDataPtrSend();
    data->logType       = SubTableData::LOG;
    Uint32 subData      = data->senderData;
    if (data->gci > m_latestSeenGCI) m_latestSeenGCI = data->gci;

    // Reformat to sections and send to replication node.
    LinearSectionPtr ptr[3];
    ptr[0].p  =  signal->theData + 25;
    ptr[0].sz =  data->noOfAttributes;
    ptr[1].p  =  signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
    ptr[1].sz =  data->dataSize;

    SubscriptionPtr subPtr;
    c_subscriptions.getPtr(subPtr, subData);
    sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_TABLE_DATA,
	       signal, SubTableData::SignalLength, JBB, ptr, 2);       
#ifdef DEBUG_GREP
    ndbout_c("Grep::PSPart: Sent SUB_TABLE_DATA (Log, GCI: %d)", 
	     data->gci);
#endif
  }
}


/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    START SYNCHRONIZATION
 * ------------------------------------------------------------------------
 * 
 *  
 **************************************************************************/

/**
 * Request to start sync (from Rep SS)
 */
void
Grep::PSCoord::execGREP_SUB_SYNC_REQ(Signal* signal) 
{
  jamEntry();
  GrepSubSyncReq * const subReq = (GrepSubSyncReq*)signal->getDataPtr();
  SubscriptionData::Part part   = (SubscriptionData::Part) subReq->part;
  Uint32 subId                  = subReq->subscriptionId;
  Uint32 subKey                 = subReq->subscriptionKey;
  BlockReference rep            = signal->getSendersBlockRef(); 

  SubCoordinatorPtr subPtr;
  if( !c_subCoordinatorPool.seize(subPtr)) {
    jam();
    SubCoordinator sub;
    sub.m_subscriberRef = rep;
    sub.m_subscriptionId = 0;
    sub.m_subscriptionKey = 0;
    sub.m_outstandingRequest = GSN_GREP_SYNC_REQ;
    sendRefToSS(signal, sub, GrepError::NOSPACE_IN_POOL);
    return;
  }

  prepareOperationRec(subPtr,
		      numberToRef(PSREPBLOCKNO, refToNode(rep)), 
		      subId, subKey,
		      GSN_GREP_SYNC_REQ);

  GrepSyncReq * req = (GrepSyncReq *)subReq;
  req->subscriptionId   = subPtr.p->m_subscriptionId;
  req->subscriptionKey  = subPtr.p->m_subscriptionKey;
  req->senderData       = subPtr.p->m_subscriberData;
  req->part             = (Uint32)part;
  
  /***************************
   * Send to all participants
   ***************************/
  NodeReceiverGroup rg(GREP,  m_grep->m_aliveNodes);
  subPtr.p->m_outstandingParticipants = rg;
  sendSignal(rg,
	     GSN_GREP_SYNC_REQ, signal, GrepSyncReq::SignalLength, JBB);
}


/**
 *  Sync req from Grep::PSCoord to PS particpant 
 */
void 
Grep::PSPart::execGREP_SYNC_REQ(Signal* signal) 
{
  jamEntry();
  
  GrepSyncReq * const grepReq = (GrepSyncReq *) signal->getDataPtr();    
  Uint32 part                 = grepReq->part;
  Uint32 subId                = grepReq->subscriptionId;
  Uint32 subKey               = grepReq->subscriptionKey;
  Uint32 subData              = grepReq->senderData;

  Subscription key;
  key.m_subscriptionId        = subId;
  key.m_subscriptionKey       = subKey;
  SubscriptionPtr subPtr;
  ndbrequire(c_subscriptions.find(subPtr, key));
  subPtr.p->m_operationPtrI   = subData;
  subPtr.p->m_outstandingRequest = GSN_GREP_SYNC_REQ; 
  /**********************************
   * Send SUB_SYNC_REQ to local SUMA
   **********************************/
  SubSyncReq * sumaReq      = (SubSyncReq *)grepReq;    
  sumaReq->subscriptionId   = subId; 
  sumaReq->subscriptionKey  = subKey;
  sumaReq->subscriberData   = subPtr.i;
  sumaReq->part             = part;
  sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ, signal, 
	     SubSyncReq::SignalLength, JBB);
}


/**
 * SYNC conf from SUMA
 */
void 
Grep::PSPart::execSUB_SYNC_CONF(Signal* signal) 
{
  jamEntry();
  
  SubSyncConf * const conf = (SubSyncConf *) signal->getDataPtr();
  Uint32 part              = conf->part;
  Uint32 subId             = conf->subscriptionId;
  Uint32 subKey            = conf->subscriptionKey;
  Uint32 subData           = conf->subscriberData;

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);

  ndbrequire(subPtr.p->m_subscriptionId  == subId);
  ndbrequire(subPtr.p->m_subscriptionKey == subKey);
  
  GrepSyncConf * grepConf     = (GrepSyncConf *)conf;
  grepConf->senderNodeId      = getOwnNodeId();
  grepConf->part              = part;
  grepConf->firstGCI          = m_firstScanGCI;
  grepConf->lastGCI           = m_lastScanGCI;
  grepConf->subscriptionId    = subId;
  grepConf->subscriptionKey   = subKey;
  grepConf->senderData        = subPtr.p->m_operationPtrI;
  sendSignal(subPtr.p->m_coordinatorRef, GSN_GREP_SYNC_CONF, signal, 
	     GrepSyncConf::SignalLength, JBB);

  m_firstScanGCI = 1;
  m_lastScanGCI = 0;
  subPtr.p->m_outstandingRequest = 0;
}

/**
 * Handle errors that either occured in:
 * 1) PSPart
 * or
 * 2) propagated from local SUMA
 *  
 * Propagates REF signal to PSCoord
 */
void 
Grep::PSPart::execSUB_SYNC_REF(Signal* signal) {
  jamEntry();
  SubSyncRef * const ref = (SubSyncRef *)signal->getDataPtr();
  Uint32 subData              = ref->subscriberData;
  GrepError::Code err     = (GrepError::Code)ref->err;
  SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
  
  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subData);
  sendRefToPSCoord(signal, *subPtr.p, err /*error*/ ,part);
  subPtr.p->m_outstandingRequest = 0;
}

/**
 *  Syncing has started... (says PS Participant)
 */
void 
Grep::PSCoord::execGREP_SYNC_CONF(Signal* signal) 
{
  jamEntry();

  GrepSyncConf const * conf = (GrepSyncConf *)signal->getDataPtr();
  Uint32 part               = conf->part;
  Uint32 firstGCI           = conf->firstGCI;
  Uint32 lastGCI            = conf->lastGCI;
  Uint32 subId              = conf->subscriptionId;
  Uint32 subKey             = conf->subscriptionKey;
  Uint32 subData            = conf->senderData;
  
  SubCoordinatorPtr subPtr;
  c_subCoordinatorPool.getPtr(subPtr, subData);
  ndbrequire(subPtr.p->m_outstandingRequest == GSN_GREP_SYNC_REQ);
  
  subPtr.p->m_outstandingParticipants.clearWaitingFor(conf->senderNodeId);
  if(!subPtr.p->m_outstandingParticipants.done()) return;

  /**
   * Send event
   */
  GrepEvent::Subscription event;
  if(part == SubscriptionData::MetaData) 
    event = GrepEvent::GrepPS_SubSyncMetaConf;
  else
    event = GrepEvent::GrepPS_SubSyncDataConf;
  
  /* @todo Johan: Add firstGCI here. /Lars */
  m_grep->sendEventRep(signal, EventReport::GrepSubscriptionInfo,
		       event, subId, subKey,
		       (Uint32)GrepError::NO_ERROR,
		       lastGCI);

  /*************************
   * All participants ready 
   *************************/
  GrepSubSyncConf * grepConf = (GrepSubSyncConf *)conf;
  grepConf->part             = part;
  grepConf->firstGCI         = firstGCI;
  grepConf->lastGCI          = lastGCI;
  grepConf->subscriptionId   = subId;
  grepConf->subscriptionKey  = subKey;

  sendSignal(subPtr.p->m_subscriberRef, GSN_GREP_SUB_SYNC_CONF, signal, 
	     GrepSubSyncConf::SignalLength, JBB);
  c_subCoordinatorPool.release(subPtr);
}

/**
 * Handle errors that either occured in:
 * 1) PSCoord
 * or
 * 2) propagated from PSPart 
 */
void 
Grep::PSCoord::execGREP_SYNC_REF(Signal* signal) {
  jamEntry();
  GrepSyncRef * const ref = (GrepSyncRef *)signal->getDataPtr();
  Uint32 subData              = ref->senderData;
  SubscriptionData::Part part = (SubscriptionData::Part)ref->part;
  GrepError::Code err         = (GrepError::Code)ref->err;
  SubCoordinatorPtr subPtr;
  c_runningSubscriptions.getPtr(subPtr, subData);  
  sendRefToSS(signal, *subPtr.p, err /*error*/, part);
}



void
Grep::PSCoord::sendRefToSS(Signal * signal, 
			   SubCoordinator sub,
			   GrepError::Code err,
			   SubscriptionData::Part part) {
  /**
  
    GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend();
    ref->senderData = sub.m_subscriberData;
    ref->subscriptionId = sub.m_subscriptionId;
    ref->subscriptionKey = sub.m_subscriptionKey;
    ref->err = err;
    sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, 
	       GrepCreateRef::SignalLength, JBB);    
*/

  jam();
  GrepEvent::Subscription event;
  switch(sub.m_outstandingRequest) {
  case GSN_GREP_CREATE_SUBID_REQ: 
    {
      jam();
      CreateSubscriptionIdRef * ref = 
	(CreateSubscriptionIdRef*)signal->getDataPtrSend();
      ref->err             = (Uint32)err;
      ref->subscriptionId  = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      sendSignal(sub.m_subscriberRef, 
		 GSN_GREP_CREATE_SUBID_REF,
		 signal,
		 CreateSubscriptionIdRef::SignalLength,
		 JBB);
      event = GrepEvent::GrepPS_CreateSubIdRef;
    }
    break;
  case GSN_GREP_CREATE_REQ: 
    {
      jam();
      GrepSubCreateRef * ref = (GrepSubCreateRef*)signal->getDataPtrSend();
      ref->err = (Uint32)err;
      ref->subscriptionId  = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_CREATE_REF, signal,
		 GrepSubCreateRef::SignalLength, JBB);
      event = GrepEvent::GrepPS_SubCreateRef;
    }
    break;
  case GSN_GREP_SYNC_REQ: 
    {
      jam();
      GrepSubSyncRef * ref = (GrepSubSyncRef*)signal->getDataPtrSend(); 
      ref->err = (Uint32)err;
      ref->subscriptionId  = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      ref->part            = (SubscriptionData::Part) part;
      sendSignal(sub.m_subscriberRef, 
		 GSN_GREP_SUB_SYNC_REF,
		 signal,
		 GrepSubSyncRef::SignalLength,
		 JBB);
      if(part == SubscriptionData::MetaData) 
	event = GrepEvent::GrepPS_SubSyncMetaRef;
      else
	event = GrepEvent::GrepPS_SubSyncDataRef;
    }
    break;
  case GSN_GREP_START_REQ:  
    {
      jam();
      GrepSubStartRef * ref = (GrepSubStartRef*)signal->getDataPtrSend();
      ref->err = (Uint32)err;
      ref->subscriptionId  = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      
      sendSignal(sub.m_subscriberRef, GSN_GREP_SUB_START_REF,
		 signal, GrepSubStartRef::SignalLength, JBB);
      if(part == SubscriptionData::MetaData) 
	event = GrepEvent::GrepPS_SubStartMetaRef;
      else
	event = GrepEvent::GrepPS_SubStartDataRef;  
      /**
       * Send event report
       */
      m_grep->sendEventRep(signal,
			   EventReport::GrepSubscriptionAlert,
			   event,
			   sub.m_subscriptionId,
			   sub.m_subscriptionKey,
			   (Uint32)err);
    }
    break;
  case GSN_GREP_REMOVE_REQ:
    {
      jam();
      GrepSubRemoveRef * ref = (GrepSubRemoveRef*)signal->getDataPtrSend();
      ref->subscriptionId  = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      ref->err             = (Uint32)err;
      
      sendSignal(sub.m_subscriberRef, 
		 GSN_GREP_SUB_REMOVE_REF,
		 signal,
		 GrepSubRemoveRef::SignalLength,
		 JBB);
      
      event = GrepEvent::GrepPS_SubRemoveRef;   
    }
    break;
  default:
    ndbrequire(false);
    event= GrepEvent::Rep_Disconnect; // remove compiler warning
  }  
  /**
   * Finally, send an event.
   */
  m_grep->sendEventRep(signal,
		       EventReport::GrepSubscriptionAlert,
		       event,
		       sub.m_subscriptionId,
		       sub.m_subscriptionKey,
		       err);
 
}


void
Grep::PSPart::sendRefToPSCoord(Signal * signal, 
			       Subscription sub,
			       GrepError::Code err,
			       SubscriptionData::Part part) {

  jam();
  GrepEvent::Subscription event;
  switch(sub.m_outstandingRequest) {
    
  case GSN_GREP_CREATE_REQ: 
    {
      GrepCreateRef * ref = (GrepCreateRef*)signal->getDataPtrSend();
      ref->senderData = sub.m_subscriberData;
      ref->subscriptionId = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      ref->err = err;
      sendSignal(sub.m_coordinatorRef, GSN_GREP_CREATE_REF, signal, 
		 GrepCreateRef::SignalLength, JBB);    
      
      event =  GrepEvent::GrepPS_SubCreateRef;
    }
    break;
  case GSN_GREP_SYNC_REQ: 
    {
      GrepSyncRef * ref = (GrepSyncRef*)signal->getDataPtrSend();
      ref->senderData = sub.m_subscriberData;
      ref->subscriptionId = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      ref->part = part;
      ref->err = err;
      sendSignal(sub.m_coordinatorRef, 
		 GSN_GREP_SYNC_REF, signal, 
		 GrepSyncRef::SignalLength, JBB);    
      if(part == SubscriptionData::MetaData) 
	event = GrepEvent::GrepPS_SubSyncMetaRef;
      else
	event = GrepEvent::GrepPS_SubSyncDataRef;    
    }
    break;
  case GSN_GREP_START_REQ:  
    {
      jam();
      GrepStartRef * ref = (GrepStartRef*)signal->getDataPtrSend();
      ref->senderData = sub.m_subscriberData;
      ref->subscriptionId = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      ref->part = (Uint32) part;
      ref->err = err;
      sendSignal(sub.m_coordinatorRef, GSN_GREP_START_REF, signal, 
		 GrepStartRef::SignalLength, JBB);			    
      if(part == SubscriptionData::MetaData) 
	event = GrepEvent::GrepPS_SubStartMetaRef;
      else 
	event = GrepEvent::GrepPS_SubStartDataRef;    
    }
    break;

  case GSN_GREP_REMOVE_REQ:
    {
      jamEntry();
      GrepRemoveRef * ref  = (GrepRemoveRef*)signal->getDataPtrSend();
      ref->senderData      = sub.m_operationPtrI;
      ref->subscriptionId  = sub.m_subscriptionId;
      ref->subscriptionKey = sub.m_subscriptionKey;
      ref->err             = err;
      sendSignal(sub.m_coordinatorRef, GSN_GREP_REMOVE_REF, signal, 
		 GrepCreateRef::SignalLength, JBB);    
      
    }
    break;
  default:
    ndbrequire(false);
    event= GrepEvent::Rep_Disconnect; // remove compiler warning
  }
  
  /**
   * Finally, send an event.
   */
  m_grep->sendEventRep(signal,
		       EventReport::GrepSubscriptionAlert,
		       event,
		       sub.m_subscriptionId,
		       sub.m_subscriptionKey,
		       err);
 
}

/**************************************************************************
 * ------------------------------------------------------------------------
 *  MODULE:    GREP PS Coordinator GCP 
 * ------------------------------------------------------------------------
 * 
 *  
 **************************************************************************/

void 
Grep::PSPart::execSUB_GCP_COMPLETE_REP(Signal* signal) 
{
  jamEntry();
  if(m_recoveryMode) {
    jam();
    return;
  }
  SubGcpCompleteRep * rep  = (SubGcpCompleteRep *)signal->getDataPtrSend();
  rep->senderRef           = reference();

  if (rep->gci > m_latestSeenGCI) m_latestSeenGCI = rep->gci;
  SubscriptionPtr subPtr;
  c_subscriptions.first(c_subPtr);
  for(; !c_subPtr.isNull(); c_subscriptions.next(c_subPtr)) {
    
    subPtr.i = c_subPtr.curr.i;
    subPtr.p = c_subscriptions.getPtr(subPtr.i);  
    sendSignal(subPtr.p->m_subscriberRef, GSN_SUB_GCP_COMPLETE_REP, signal, 
	       SubGcpCompleteRep::SignalLength, JBB);
  }

#ifdef DEBUG_GREP
  ndbout_c("Grep::PSPart: Recd SUB_GCP_COMPLETE_REP "
	   "(GCI: %d, nodeId: %d) from SUMA", 
	   rep->gci, refToNode(rep->senderRef));
#endif
}


void
Grep::PSPart::execSUB_SYNC_CONTINUE_REQ(Signal* signal) 
{
  jamEntry();
  SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtr();
  Uint32 subData                 = req->subscriberData;

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr,subData);
  
  /**
   * @todo Figure out how to control how much data we can receive?
   */
  SubSyncContinueConf * conf = (SubSyncContinueConf*)req;
  conf->subscriptionId       = subPtr.p->m_subscriptionId;
  conf->subscriptionKey      = subPtr.p->m_subscriptionKey;
  sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal, 
	     SubSyncContinueConf::SignalLength, JBB);  
}

void
Grep::sendEventRep(Signal * signal,
		   EventReport::EventType type, 
		   GrepEvent::Subscription event,
		   Uint32 subId,
		   Uint32 subKey,
		   Uint32 err,
		   Uint32 other) {
  jam();
  signal->theData[0] = type;
  signal->theData[1] = event;
  signal->theData[2] = subId;
  signal->theData[3] = subKey; 
  signal->theData[4] = err;
  
  if(other==0)
    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5 ,JBB);        
  else {
    signal->theData[5] = other;
    sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 6 ,JBB);        
  }
}