Implemented fragmented signal from API

primarily to enable creation of tables with more attributes than 91 (up to 128)

Intermediate push for testing
CHUNK_SZ will be set higher in real implementation and API_TRACE added
parent cce8f544
...@@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() : ...@@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() :
theTransporterRegistry(0), theTransporterRegistry(0),
theStopReceive(0), theStopReceive(0),
theSendThread(NULL), theSendThread(NULL),
theReceiveThread(NULL) theReceiveThread(NULL),
m_fragmented_signal_id(0)
{ {
theOwnId = 0; theOwnId = 0;
...@@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ ...@@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
return (ss == SEND_OK ? 0 : -1); return (ss == SEND_OK ? 0 : -1);
} }
#define CHUNK_SZ 100u
int
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
LinearSectionPtr ptr[3], Uint32 secs){
NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
LinearSectionPtr tmp_ptr[3];
Uint32 unique_id= m_fragmented_signal_id++; // next unique id
unsigned i;
for (i= 0; i < secs; i++)
tmp_ptr[i]= ptr[i];
unsigned start_i= 0;
unsigned chunk_sz= 0;
unsigned fragment_info= 0;
Uint32 *tmp_data= tmp_signal.getDataPtrSend();
for (i= 0; i < secs;) {
unsigned save_sz= tmp_ptr[i].sz;
tmp_data[i-start_i]= i;
if (chunk_sz + save_sz > CHUNK_SZ) {
// truncate
unsigned send_sz= CHUNK_SZ - chunk_sz;
tmp_ptr[i].sz= send_sz;
if (fragment_info < 2)
fragment_info++;
// send tmp_signal
tmp_data[i-start_i+1]= unique_id;
tmp_signal.setLength(i-start_i+2);
tmp_signal.m_fragmentInfo= fragment_info;
// do prepare send
{
int ret;
if(getIsNodeSendable(aNode) == true){
SendStatus ss = theTransporterRegistry->prepareSend
(&tmp_signal,
1, // JBB
tmp_signal.getDataPtrSend(),
aNode,
&ptr[start_i]);
assert(ss != SEND_MESSAGE_TOO_BIG);
ret = (ss == SEND_OK ? 0 : -1);
} else
ret = -1;
if (ret != SEND_OK)
return ret;
}
// setup variables for next signal
start_i= i;
chunk_sz= 0;
tmp_ptr[i].sz= save_sz-send_sz;
tmp_ptr[i].p+= send_sz;
}
else
{
chunk_sz+= save_sz;
i++;
}
}
unsigned a_sz= aSignal->getLength();
if (fragment_info > 0) {
// update the original signal to include section info
Uint32 *a_data= aSignal->getDataPtrSend();
unsigned tmp_sz= i-start_i;
memcpy(a_data+a_sz,
tmp_data,
tmp_sz*sizeof(Uint32));
a_data[a_sz+tmp_sz]= unique_id;
aSignal->setLength(a_sz+tmp_sz+1);
// send last fragment
aSignal->m_fragmentInfo= 3;
aSignal->m_noOfSections= i-start_i;
} else {
aSignal->m_noOfSections= secs;
}
// send aSignal
int ret;
if(getIsNodeSendable(aNode) == true){
SendStatus ss = theTransporterRegistry->prepareSend
(aSignal,
1, // JBB
aSignal->getDataPtrSend(),
aNode,
&ptr[start_i]);
assert(ss != SEND_MESSAGE_TOO_BIG);
ret = (ss == SEND_OK ? 0 : -1);
} else
ret = -1;
aSignal->m_noOfSections = 0;
aSignal->m_fragmentInfo = 0;
aSignal->setLength(a_sz);
return ret;
}
#if 0
int int
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
LinearSectionPtr ptr[3], Uint32 secs){ LinearSectionPtr ptr[3], Uint32 secs){
...@@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, ...@@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
aSignal->m_noOfSections = 0; aSignal->m_noOfSections = 0;
return -1; return -1;
} }
#endif
int int
...@@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, ...@@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal,
aSignal->theSendersBlockRef = tmp; aSignal->theSendersBlockRef = tmp;
} }
#endif #endif
SendStatus ss = theTransporterRegistry->prepareSend(aSignal, SendStatus ss =
1, // JBB theTransporterRegistry->prepareSend(aSignal, 1, // JBB
aSignal->getDataPtrSend(), aSignal->getDataPtrSend(),
aNode, aNode, ptr);
ptr);
assert(ss != SEND_MESSAGE_TOO_BIG); assert(ss != SEND_MESSAGE_TOO_BIG);
aSignal->m_noOfSections = 0; aSignal->m_noOfSections = 0;
return (ss == SEND_OK ? 0 : -1); return (ss == SEND_OK ? 0 : -1);
......
...@@ -224,7 +224,8 @@ private: ...@@ -224,7 +224,8 @@ private:
} m_threads; } m_threads;
Uint32 m_max_trans_id; Uint32 m_max_trans_id;
Uint32 m_fragmented_signal_id;
/** /**
* execute function * execute function
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment