Commit 99165bb1 authored by jonas@perch.ndb.mysql.com's avatar jonas@perch.ndb.mysql.com

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb
parents 9de965b3 3600a25c
...@@ -5016,12 +5016,13 @@ void Dblqh::packLqhkeyreqLab(Signal* signal) ...@@ -5016,12 +5016,13 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
Uint32 nextNodeId = regTcPtr->nextReplica; Uint32 nextNodeId = regTcPtr->nextReplica;
Uint32 nextVersion = getNodeInfo(nextNodeId).m_version; Uint32 nextVersion = getNodeInfo(nextNodeId).m_version;
UintR TAiLen = regTcPtr->reclenAiLqhkey;
UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1); UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator); LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec); LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica); LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
LqhKeyReq::setAIInLqhKeyReq(Treqinfo, regTcPtr->reclenAiLqhkey); LqhKeyReq::setAIInLqhKeyReq(Treqinfo, TAiLen);
if (unlikely(nextVersion < NDBD_ROWID_VERSION)) if (unlikely(nextVersion < NDBD_ROWID_VERSION))
{ {
...@@ -5124,22 +5125,32 @@ void Dblqh::packLqhkeyreqLab(Signal* signal) ...@@ -5124,22 +5125,32 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
lqhKeyReq->variableData[nextPos + 0] = sig0; lqhKeyReq->variableData[nextPos + 0] = sig0;
nextPos += LqhKeyReq::getGCIFlag(Treqinfo); nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
sig0 = regTcPtr->firstAttrinfo[0];
sig1 = regTcPtr->firstAttrinfo[1];
sig2 = regTcPtr->firstAttrinfo[2];
sig3 = regTcPtr->firstAttrinfo[3];
sig4 = regTcPtr->firstAttrinfo[4];
UintR TAiLen = regTcPtr->reclenAiLqhkey;
BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica); BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
if (likely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength <= 25))
{
jam();
sig0 = regTcPtr->firstAttrinfo[0];
sig1 = regTcPtr->firstAttrinfo[1];
sig2 = regTcPtr->firstAttrinfo[2];
sig3 = regTcPtr->firstAttrinfo[3];
sig4 = regTcPtr->firstAttrinfo[4];
lqhKeyReq->variableData[nextPos] = sig0; lqhKeyReq->variableData[nextPos] = sig0;
lqhKeyReq->variableData[nextPos + 1] = sig1; lqhKeyReq->variableData[nextPos + 1] = sig1;
lqhKeyReq->variableData[nextPos + 2] = sig2; lqhKeyReq->variableData[nextPos + 2] = sig2;
lqhKeyReq->variableData[nextPos + 3] = sig3; lqhKeyReq->variableData[nextPos + 3] = sig3;
lqhKeyReq->variableData[nextPos + 4] = sig4; lqhKeyReq->variableData[nextPos + 4] = sig4;
nextPos += TAiLen; nextPos += TAiLen;
TAiLen = 0;
}
else
{
Treqinfo &= ~(Uint32)(RI_AI_IN_THIS_MASK << RI_AI_IN_THIS_SHIFT);
lqhKeyReq->requestInfo = Treqinfo;
}
sendSignal(lqhRef, GSN_LQHKEYREQ, signal, sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
nextPos + LqhKeyReq::FixedSignalLength, JBB); nextPos + LqhKeyReq::FixedSignalLength, JBB);
if (regTcPtr->primKeyLen > 4) { if (regTcPtr->primKeyLen > 4) {
...@@ -5165,6 +5176,17 @@ void Dblqh::packLqhkeyreqLab(Signal* signal) ...@@ -5165,6 +5176,17 @@ void Dblqh::packLqhkeyreqLab(Signal* signal)
signal->theData[0] = sig0; signal->theData[0] = sig0;
signal->theData[1] = sig1; signal->theData[1] = sig1;
signal->theData[2] = sig2; signal->theData[2] = sig2;
if (unlikely(nextPos + TAiLen + LqhKeyReq::FixedSignalLength > 25))
{
jam();
/**
* 4 replicas...
*/
memcpy(signal->theData+3, regTcPtr->firstAttrinfo, TAiLen << 2);
sendSignal(lqhRef, GSN_ATTRINFO, signal, 3 + TAiLen, JBB);
}
AttrbufPtr regAttrinbufptr; AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i = regTcPtr->firstAttrinbuf; regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
while (regAttrinbufptr.i != RNIL) { while (regAttrinbufptr.i != RNIL) {
......
...@@ -312,11 +312,12 @@ void AsyncFile::openReq(Request* request) ...@@ -312,11 +312,12 @@ void AsyncFile::openReq(Request* request)
Uint32 new_flags = 0; Uint32 new_flags = 0;
// Convert file open flags from Solaris to Liux // Convert file open flags from Solaris to Liux
if(flags & FsOpenReq::OM_CREATE){ if (flags & FsOpenReq::OM_CREATE)
{
new_flags |= O_CREAT; new_flags |= O_CREAT;
} }
if(flags & FsOpenReq::OM_TRUNCATE){ if (flags & FsOpenReq::OM_TRUNCATE){
#if 0 #if 0
if(Global_unlinkO_CREAT){ if(Global_unlinkO_CREAT){
unlink(theFileName.c_str()); unlink(theFileName.c_str());
...@@ -330,25 +331,25 @@ void AsyncFile::openReq(Request* request) ...@@ -330,25 +331,25 @@ void AsyncFile::openReq(Request* request)
m_syncFrequency = 1024*1024; // Hard coded to 1M m_syncFrequency = 1024*1024; // Hard coded to 1M
} }
if(flags & FsOpenReq::OM_APPEND){ if (flags & FsOpenReq::OM_APPEND){
new_flags |= O_APPEND; new_flags |= O_APPEND;
} }
if((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT)) if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
{ {
#ifdef O_SYNC #ifdef O_SYNC
new_flags |= O_SYNC; new_flags |= O_SYNC;
#endif #endif
} }
#ifndef NDB_NO_O_DIRECT /* to allow tmpfs */ //#ifndef NDB_NO_O_DIRECT /* to allow tmpfs */
#ifdef O_DIRECT #ifdef O_DIRECT
if (flags & FsOpenReq::OM_DIRECT) if (flags & FsOpenReq::OM_DIRECT)
{ {
new_flags |= O_DIRECT; new_flags |= O_DIRECT;
} }
#endif #endif
#endif //#endif
switch(flags & 0x3){ switch(flags & 0x3){
case FsOpenReq::OM_READONLY: case FsOpenReq::OM_READONLY:
...@@ -370,44 +371,73 @@ void AsyncFile::openReq(Request* request) ...@@ -370,44 +371,73 @@ void AsyncFile::openReq(Request* request)
const int mode = S_IRUSR | S_IWUSR | const int mode = S_IRUSR | S_IWUSR |
S_IRGRP | S_IWGRP | S_IRGRP | S_IWGRP |
S_IROTH | S_IWOTH; S_IROTH | S_IWOTH;
if(flags & FsOpenReq::OM_CREATE_IF_NONE){ if (flags & FsOpenReq::OM_CREATE_IF_NONE)
if((theFd = ::open(theFileName.c_str(), new_flags, mode)) != -1) { {
Uint32 tmp_flags = new_flags;
#ifdef O_DIRECT
tmp_flags &= ~O_DIRECT;
#endif
if ((theFd = ::open(theFileName.c_str(), tmp_flags, mode)) != -1)
{
close(theFd); close(theFd);
request->error = FsRef::fsErrFileExists; request->error = FsRef::fsErrFileExists;
return; return;
} }
new_flags |= O_CREAT; new_flags |= O_CREAT;
} }
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) { no_odirect:
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode)))
{
PRINT_ERRORANDFLAGS(new_flags); PRINT_ERRORANDFLAGS(new_flags);
if( (errno == ENOENT ) && (new_flags & O_CREAT ) ) { if ((errno == ENOENT) && (new_flags & O_CREAT))
{
createDirectories(); createDirectories();
if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode))) { if (-1 == (theFd = ::open(theFileName.c_str(), new_flags, mode)))
{
#ifdef O_DIRECT
if (new_flags & O_DIRECT)
{
new_flags &= ~O_DIRECT;
goto no_odirect;
}
#endif
PRINT_ERRORANDFLAGS(new_flags); PRINT_ERRORANDFLAGS(new_flags);
request->error = errno; request->error = errno;
return; return;
} }
} else { }
#ifdef O_DIRECT
else if (new_flags & O_DIRECT)
{
new_flags &= ~O_DIRECT;
goto no_odirect;
}
#endif
else
{
request->error = errno; request->error = errno;
return; return;
} }
} }
if(flags & FsOpenReq::OM_CHECK_SIZE) if (flags & FsOpenReq::OM_CHECK_SIZE)
{ {
struct stat buf; struct stat buf;
if((fstat(theFd, &buf) == -1)) if ((fstat(theFd, &buf) == -1))
{ {
request->error = errno; request->error = errno;
} else if(buf.st_size != request->par.open.file_size){ }
else if(buf.st_size != request->par.open.file_size)
{
request->error = FsRef::fsErrInvalidFileSize; request->error = FsRef::fsErrInvalidFileSize;
} }
if(request->error) if (request->error)
return; return;
} }
if(flags & FsOpenReq::OM_INIT){ if (flags & FsOpenReq::OM_INIT)
{
off_t off = 0; off_t off = 0;
const off_t sz = request->par.open.file_size; const off_t sz = request->par.open.file_size;
Uint32 tmp[sizeof(SignalHeader)+25]; Uint32 tmp[sizeof(SignalHeader)+25];
......
...@@ -1279,6 +1279,23 @@ find_bucket(Vector<Gci_container> * active, Uint64 gci) ...@@ -1279,6 +1279,23 @@ find_bucket(Vector<Gci_container> * active, Uint64 gci)
return find_bucket_chained(active,gci); return find_bucket_chained(active,gci);
} }
static
void
crash_on_invalid_SUB_GCP_COMPLETE_REP(const Gci_container* bucket,
const SubGcpCompleteRep * const rep,
Uint32 nodes)
{
Uint32 old_cnt = bucket->m_gcp_complete_rep_count;
ndbout_c("INVALID SUB_GCP_COMPLETE_REP");
ndbout_c("gci: %d", rep->gci);
ndbout_c("sender: %x", rep->senderRef);
ndbout_c("count: %d", rep->gcp_complete_rep_count);
ndbout_c("bucket count: %u", old_cnt);
ndbout_c("nodes: %u", nodes);
abort();
}
void void
NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep) NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
{ {
...@@ -1317,9 +1334,13 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep) ...@@ -1317,9 +1334,13 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
old_cnt = m_system_nodes; old_cnt = m_system_nodes;
} }
assert(old_cnt >= cnt); //assert(old_cnt >= cnt);
if (unlikely(! (old_cnt >= cnt)))
{
crash_on_invalid_SUB_GCP_COMPLETE_REP(bucket, rep, m_system_nodes);
}
bucket->m_gcp_complete_rep_count = old_cnt - cnt; bucket->m_gcp_complete_rep_count = old_cnt - cnt;
if(old_cnt == cnt) if(old_cnt == cnt)
{ {
if(likely(gci == m_latestGCI + 1 || m_latestGCI == 0)) if(likely(gci == m_latestGCI + 1 || m_latestGCI == 0))
......
...@@ -1435,8 +1435,7 @@ NdbTransaction::sendTC_COMMIT_ACK(TransporterFacade *tp, ...@@ -1435,8 +1435,7 @@ NdbTransaction::sendTC_COMMIT_ACK(TransporterFacade *tp,
Uint32 * dataPtr = aSignal->getDataPtrSend(); Uint32 * dataPtr = aSignal->getDataPtrSend();
dataPtr[0] = transId1; dataPtr[0] = transId1;
dataPtr[1] = transId2; dataPtr[1] = transId2;
tp->sendSignalUnCond(aSignal, refToNode(aTCRef));
tp->sendSignal(aSignal, refToNode(aTCRef));
} }
int int
......
...@@ -343,7 +343,7 @@ execute(void * callbackObj, SignalHeader * const header, ...@@ -343,7 +343,7 @@ execute(void * callbackObj, SignalHeader * const header,
Uint32 aNodeId= refToNode(ref); Uint32 aNodeId= refToNode(ref);
tSignal.theReceiversBlockNumber= refToBlock(ref); tSignal.theReceiversBlockNumber= refToBlock(ref);
tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK; tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
theFacade->sendSignal(&tSignal, aNodeId); theFacade->sendSignalUnCond(&tSignal, aNodeId);
} }
break; break;
} }
...@@ -987,7 +987,7 @@ TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){ ...@@ -987,7 +987,7 @@ TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
signalLogger.sendSignal(* aSignal, signalLogger.sendSignal(* aSignal,
1, 1,
aSignal->getDataPtr(), tDataPtr,
aNode, ptr, 0); aNode, ptr, 0);
signalLogger.flushSignalLog(); signalLogger.flushSignalLog();
aSignal->theSendersBlockRef = tmp; aSignal->theSendersBlockRef = tmp;
...@@ -1014,6 +1014,7 @@ TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){ ...@@ -1014,6 +1014,7 @@ TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
int int
TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
Uint32* tDataPtr = aSignal->getDataPtrSend();
#ifdef API_TRACE #ifdef API_TRACE
if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){ if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
Uint32 tmp = aSignal->theSendersBlockRef; Uint32 tmp = aSignal->theSendersBlockRef;
...@@ -1021,7 +1022,7 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ ...@@ -1021,7 +1022,7 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
signalLogger.sendSignal(* aSignal, signalLogger.sendSignal(* aSignal,
0, 0,
aSignal->getDataPtr(), tDataPtr,
aNode, ptr, 0); aNode, ptr, 0);
signalLogger.flushSignalLog(); signalLogger.flushSignalLog();
aSignal->theSendersBlockRef = tmp; aSignal->theSendersBlockRef = tmp;
...@@ -1032,7 +1033,7 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ ...@@ -1032,7 +1033,7 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
(aSignal->theReceiversBlockNumber != 0)); (aSignal->theReceiversBlockNumber != 0));
SendStatus ss = theTransporterRegistry->prepareSend(aSignal, SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
0, 0,
aSignal->getDataPtr(), tDataPtr,
aNode, aNode,
0); 0);
......
...@@ -175,7 +175,8 @@ private: ...@@ -175,7 +175,8 @@ private:
friend class GrepSS; friend class GrepSS;
friend class Ndb; friend class Ndb;
friend class Ndb_cluster_connection_impl; friend class Ndb_cluster_connection_impl;
friend class NdbTransaction;
int sendSignalUnCond(NdbApiSignal *, NodeId nodeId); int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
bool isConnected(NodeId aNodeId); bool isConnected(NodeId aNodeId);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment