/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>

#include "Packer.hpp"
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include <RefConvert.hpp>

#define MAX_RECEIVED_SIGNALS 1024
Uint32
TransporterRegistry::unpack(Uint32 * readPtr,
			    Uint32 sizeOfData,
			    NodeId remoteNodeId,
			    IOState state) {
  SignalHeader signalHeader;
  LinearSectionPtr ptr[3];
  
  Uint32 usedData   = 0;
  Uint32 loop_count = 0; 
 
  if(state == NoHalt || state == HaltOutput){
    while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
           (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      loop_count++;
      
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
	//Do funky stuff
      }
#endif

      const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
      const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;

      if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
        DEBUG("Message Size = " << messageLenBytes);
	reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
        return usedData;
      }//if
      
      if (sizeOfData < messageLenBytes) {
	break;
      }//if
      
      if(Protocol6::getCheckSumIncluded(word1)){
	const Uint32 tmpLen = messageLen32 - 1;
	const Uint32 checkSumSent     = readPtr[tmpLen];
	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
	
	if(checkSumComputed != checkSumSent){
	  reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
          return usedData;
	}//if
      }//if
      
#if 0
      if(Protocol6::getCompressed(word1)){
	//Do funky stuff
      }//if
#endif
      
      Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
      
      Uint32 sBlockNum = signalHeader.theSendersBlockRef;
      sBlockNum = numberToRef(sBlockNum, remoteNodeId);
      signalHeader.theSendersBlockRef = sBlockNum;
      
      Uint8 prio = Protocol6::getPrio(word1);
      
      Uint32 * signalData = &readPtr[3];
      
      if(Protocol6::getSignalIdIncluded(word1) == 0){
	signalHeader.theSendersSignalId = ~0;
      } else {
	signalHeader.theSendersSignalId = * signalData;
	signalData ++;
      }//if
      
      Uint32 * sectionPtr = signalData + signalHeader.theLength;
      Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
      for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
	Uint32 sz = * sectionPtr;
	ptr[i].sz = sz;
	ptr[i].p = sectionData;
	
	sectionPtr ++;
	sectionData += sz;
      }

      execute(callbackObj, &signalHeader, prio, signalData, ptr);
      
      readPtr     += messageLen32;
      sizeOfData  -= messageLenBytes;
      usedData    += messageLenBytes;
    }//while
    
    return usedData;
  } else {
    /** state = HaltIO || state == HaltInput */

    while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
           (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      loop_count++;
      
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
	//Do funky stuff
      }//if
#endif
      
      const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
      const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
      if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
	DEBUG("Message Size = " << messageLenBytes);
	reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
        return usedData;
      }//if
      
      if (sizeOfData < messageLenBytes) {
	break;
      }//if
      
      if(Protocol6::getCheckSumIncluded(word1)){
	const Uint32 tmpLen = messageLen32 - 1;
	const Uint32 checkSumSent     = readPtr[tmpLen];
	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
	
	if(checkSumComputed != checkSumSent){
	  
	  //theTransporters[remoteNodeId]->disconnect();
	  reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
          return usedData;
	}//if
      }//if
      
#if 0
      if(Protocol6::getCompressed(word1)){
	//Do funky stuff
      }//if
#endif
      
      Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
      
      Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;

      if(rBlockNum == 252){
	Uint32 sBlockNum = signalHeader.theSendersBlockRef;
	sBlockNum = numberToRef(sBlockNum, remoteNodeId);
	signalHeader.theSendersBlockRef = sBlockNum;
	
	Uint8 prio = Protocol6::getPrio(word1);
	
	Uint32 * signalData = &readPtr[3];
	
	if(Protocol6::getSignalIdIncluded(word1) == 0){
	  signalHeader.theSendersSignalId = ~0;
	} else {
	  signalHeader.theSendersSignalId = * signalData;
	  signalData ++;
	}//if
	
	Uint32 * sectionPtr = signalData + signalHeader.theLength;
	Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
	for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
	  Uint32 sz = * sectionPtr;
	  ptr[i].sz = sz;
	  ptr[i].p = sectionData;
	  
	  sectionPtr ++;
	  sectionData += sz;
	}

	execute(callbackObj, &signalHeader, prio, signalData, ptr);
      } else {
	DEBUG("prepareReceive(...) - Discarding message to block: "
	      << rBlockNum << " from Node: " << remoteNodeId);
      }//if
      
      readPtr     += messageLen32;
      sizeOfData  -= messageLenBytes;
      usedData    += messageLenBytes;
    }//while
    

    return usedData;
  }//if
}

Uint32 *
TransporterRegistry::unpack(Uint32 * readPtr,
			    Uint32 * eodPtr,
			    NodeId remoteNodeId,
			    IOState state) {
  static SignalHeader signalHeader;
  static LinearSectionPtr ptr[3];
  Uint32 loop_count = 0;
  if(state == NoHalt || state == HaltOutput){
    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      loop_count++; 
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
	//Do funky stuff
      }
#endif
      
      const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
      
      if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
        DEBUG("Message Size(words) = " << messageLen32);
	reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
        return readPtr;
      }//if
      
      if(Protocol6::getCheckSumIncluded(word1)){
	const Uint32 tmpLen = messageLen32 - 1;
	const Uint32 checkSumSent     = readPtr[tmpLen];
	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
	
	if(checkSumComputed != checkSumSent){
	  reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
	  return readPtr;
	}//if
      }//if
      
#if 0
      if(Protocol6::getCompressed(word1)){
	//Do funky stuff
      }//if
#endif
      
      Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
      
      Uint32 sBlockNum = signalHeader.theSendersBlockRef;
      sBlockNum = numberToRef(sBlockNum, remoteNodeId);
      signalHeader.theSendersBlockRef = sBlockNum;
      
      Uint8 prio = Protocol6::getPrio(word1);
      
      Uint32 * signalData = &readPtr[3];
      
      if(Protocol6::getSignalIdIncluded(word1) == 0){
	signalHeader.theSendersSignalId = ~0;
      } else {
	signalHeader.theSendersSignalId = * signalData;
	signalData ++;
      }//if

      Uint32 * sectionPtr = signalData + signalHeader.theLength;
      Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
      for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
	Uint32 sz = * sectionPtr;
	ptr[i].sz = sz;
	ptr[i].p = sectionData;
	
	sectionPtr ++;
	sectionData += sz;
      }
      
      execute(callbackObj, &signalHeader, prio, signalData, ptr);
      
      readPtr += messageLen32;
    }//while
  } else {
    /** state = HaltIO || state == HaltInput */

    while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
      Uint32 word1 = readPtr[0];
      Uint32 word2 = readPtr[1];
      Uint32 word3 = readPtr[2];
      loop_count++; 
#if 0
      if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
	//Do funky stuff
      }//if
#endif
      
      const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
      if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
	DEBUG("Message Size(words) = " << messageLen32);
	reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
        return readPtr;
      }//if
      
      if(Protocol6::getCheckSumIncluded(word1)){
	const Uint32 tmpLen = messageLen32 - 1;
	const Uint32 checkSumSent     = readPtr[tmpLen];
	const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
	
	if(checkSumComputed != checkSumSent){
	  
	  //theTransporters[remoteNodeId]->disconnect();
	  reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
	  return readPtr;
	}//if
      }//if
      
#if 0
      if(Protocol6::getCompressed(word1)){
	//Do funky stuff
      }//if
#endif
      
      Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
      
      Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
      
      if(rBlockNum == 252){
	Uint32 sBlockNum = signalHeader.theSendersBlockRef;
	sBlockNum = numberToRef(sBlockNum, remoteNodeId);
	signalHeader.theSendersBlockRef = sBlockNum;
	
	Uint8 prio = Protocol6::getPrio(word1);
	
	Uint32 * signalData = &readPtr[3];
	
	if(Protocol6::getSignalIdIncluded(word1) == 0){
	  signalHeader.theSendersSignalId = ~0;
	} else {
	  signalHeader.theSendersSignalId = * signalData;
	  signalData ++;
	}//if
	
	Uint32 * sectionPtr = signalData + signalHeader.theLength;
	Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
	for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
	  Uint32 sz = * sectionPtr;
	  ptr[i].sz = sz;
	  ptr[i].p = sectionData;
	  
	  sectionPtr ++;
	  sectionData += sz;
	}

	execute(callbackObj, &signalHeader, prio, signalData, ptr);
      } else {
	DEBUG("prepareReceive(...) - Discarding message to block: "
	      << rBlockNum << " from Node: " << remoteNodeId);
      }//if
      
      readPtr += messageLen32;
    }//while
  }//if
  return readPtr;
}

Packer::Packer(bool signalId, bool checksum) {
  
  checksumUsed    = (checksum ? 1 : 0);
  signalIdUsed    = (signalId ? 1 : 0);
  
  // Set the priority

  preComputedWord1 = 0;
  Protocol6::setByteOrder(preComputedWord1, 0);
  Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
  Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
  Protocol6::setCompressed(preComputedWord1, 0);
}

inline
void
import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
  const Uint32 sz = ptr.sz;
  memcpy(insertPtr, ptr.p, 4 * sz);
  insertPtr += sz;
}

void copy(Uint32 * & insertPtr, 
	  class SectionSegmentPool &, const SegmentedSectionPtr & ptr);

void
Packer::pack(Uint32 * insertPtr, 
	     Uint32 prio, 
	     const SignalHeader * header, 
	     const Uint32 * theData,
	     const LinearSectionPtr ptr[3]) const {
  Uint32 i;
  
  Uint32 dataLen32 = header->theLength;
  Uint32 no_segs = header->m_noOfSections;

  Uint32 len32 = 
    dataLen32 + no_segs + 
    checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
  

  for(i = 0; i<no_segs; i++){
    len32 += ptr[i].sz;
  }
  
  /**
   * Do insert of data
   */
  Uint32 word1 = preComputedWord1;
  Uint32 word2 = 0;
  Uint32 word3 = 0;
  
  Protocol6::setPrio(word1, prio);
  Protocol6::setMessageLength(word1, len32);
  Protocol6::createProtocol6Header(word1, word2, word3, header);

  insertPtr[0] = word1;
  insertPtr[1] = word2;
  insertPtr[2] = word3;
  
  Uint32 * tmpInserPtr = &insertPtr[3];
  
  if(signalIdUsed){
    * tmpInserPtr = header->theSignalId;
    tmpInserPtr++;
  }
  
  memcpy(tmpInserPtr, theData, 4 * dataLen32);

  tmpInserPtr += dataLen32;
  for(i = 0; i<no_segs; i++){
    tmpInserPtr[i] = ptr[i].sz;
  }

  tmpInserPtr += no_segs;
  for(i = 0; i<no_segs; i++){
    import(tmpInserPtr, ptr[i]);
  }
  
  if(checksumUsed){
    * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
  }
}

void
Packer::pack(Uint32 * insertPtr, 
	     Uint32 prio, 
	     const SignalHeader * header, 
	     const Uint32 * theData,
	     class SectionSegmentPool & thePool,
	     const SegmentedSectionPtr ptr[3]) const {
  Uint32 i;
  
  Uint32 dataLen32 = header->theLength;
  Uint32 no_segs = header->m_noOfSections;

  Uint32 len32 = 
    dataLen32 + no_segs + 
    checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
  
  for(i = 0; i<no_segs; i++){
    len32 += ptr[i].sz;
  }
  
  /**
   * Do insert of data
   */
  Uint32 word1 = preComputedWord1;
  Uint32 word2 = 0;
  Uint32 word3 = 0;
  
  Protocol6::setPrio(word1, prio);
  Protocol6::setMessageLength(word1, len32);
  Protocol6::createProtocol6Header(word1, word2, word3, header);

  insertPtr[0] = word1;
  insertPtr[1] = word2;
  insertPtr[2] = word3;
  
  Uint32 * tmpInserPtr = &insertPtr[3];
  
  if(signalIdUsed){
    * tmpInserPtr = header->theSignalId;
    tmpInserPtr++;
  }
  
  memcpy(tmpInserPtr, theData, 4 * dataLen32);
  
  tmpInserPtr += dataLen32;
  for(i = 0; i<no_segs; i++){
    tmpInserPtr[i] = ptr[i].sz;
  }

  tmpInserPtr += no_segs;
  for(i = 0; i<no_segs; i++){
    copy(tmpInserPtr, thePool, ptr[i]);
  }
  
  if(checksumUsed){
    * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
  }
}