Commit 567a31e3 authored by unknown's avatar unknown

ndb - wl#2972 fix tinyblob


storage/ndb/src/ndbapi/NdbBlob.cpp:
  tinyblob of course has no blob table and no blob op
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  tinyblob of course has no blob table and no blob op
storage/ndb/test/ndbapi/test_event_merge.cpp:
  tinyblob of course has no blob table and no blob op
parent 30877fc5
...@@ -1269,6 +1269,8 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1269,6 +1269,8 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
// prepare blob column and table // prepare blob column and table
if (prepareColumn() == -1) if (prepareColumn() == -1)
DBUG_RETURN(-1); DBUG_RETURN(-1);
// tinyblob sanity
assert((theBlobEventOp == NULL) == (theBlobTable == NULL));
// extra buffers // extra buffers
theBlobEventDataBuf.alloc(thePartSize); theBlobEventDataBuf.alloc(thePartSize);
// prepare receive of head+inline // prepare receive of head+inline
...@@ -1278,6 +1280,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1278,6 +1280,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
// prepare receive of blob part // prepare receive of blob part
if (theBlobEventOp != NULL) {
if ((theBlobEventPkRecAttr = if ((theBlobEventPkRecAttr =
theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0), theBlobEventOp->getValue(theBlobTable->getColumn((Uint32)0),
theKeyBuf.data, version)) == NULL || theKeyBuf.data, version)) == NULL ||
...@@ -1293,6 +1296,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, ...@@ -1293,6 +1296,7 @@ NdbBlob::atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp,
setErrorCode(theBlobEventOp); setErrorCode(theBlobEventOp);
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
}
setState(Prepared); setState(Prepared);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -318,13 +318,18 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n) ...@@ -318,13 +318,18 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
tBlob = tBlob->theNext; tBlob = tBlob->theNext;
} }
NdbEventOperationImpl* tBlobOp = NULL;
const bool is_tinyblob = (tAttrInfo->getPartSize() == 0);
assert(is_tinyblob == (tAttrInfo->m_blobTable == NULL));
if (! is_tinyblob) {
// blob event name // blob event name
char bename[MAX_TAB_NAME_SIZE]; char bename[MAX_TAB_NAME_SIZE];
NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo); NdbBlob::getBlobEventName(bename, m_eventImpl, tAttrInfo);
// find blob event op if any (it serves both post and pre handles) // find blob event op if any (it serves both post and pre handles)
assert(tAttrInfo->m_blobTable != NULL); tBlobOp = theBlobOpList;
NdbEventOperationImpl* tBlobOp = theBlobOpList;
NdbEventOperationImpl* tLastBlopOp = NULL; NdbEventOperationImpl* tLastBlopOp = NULL;
while (tBlobOp != NULL) { while (tBlobOp != NULL) {
if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) { if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
...@@ -355,6 +360,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n) ...@@ -355,6 +360,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
tLastBlopOp->theNextBlobOp = tBlobOp; tLastBlopOp->theNextBlobOp = tBlobOp;
tBlobOp->theNextBlobOp = NULL; tBlobOp->theNextBlobOp = NULL;
} }
}
tBlob = m_ndb->getNdbBlob(); tBlob = m_ndb->getNdbBlob();
if (tBlob == NULL) if (tBlob == NULL)
......
...@@ -203,7 +203,9 @@ struct Col { ...@@ -203,7 +203,9 @@ struct Col {
uint length; uint length;
uint size; uint size;
bool isblob() const { bool isblob() const {
return type == NdbDictionary::Column::Text; return
type == NdbDictionary::Column::Text ||
type == NdbDictionary::Column::Blob;
} }
}; };
...@@ -213,19 +215,21 @@ static Col g_col[] = { ...@@ -213,19 +215,21 @@ static Col g_col[] = {
{ 2, "seq", NdbDictionary::Column::Unsigned, false, false, 1, 4 }, { 2, "seq", NdbDictionary::Column::Unsigned, false, false, 1, 4 },
{ 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen }, { 3, "cc1", NdbDictionary::Column::Char, false, true, g_charlen, g_charlen },
{ 4, "tx1", NdbDictionary::Column::Text, false, true, 0, 0 }, { 4, "tx1", NdbDictionary::Column::Text, false, true, 0, 0 },
{ 5, "tx2", NdbDictionary::Column::Text, false, true, 0, 0 } { 5, "tx2", NdbDictionary::Column::Text, false, true, 0, 0 },
{ 6, "bl1", NdbDictionary::Column::Blob, false, true, 0, 0 } // tinyblob
}; };
static const uint g_maxcol = sizeof(g_col)/sizeof(g_col[0]); static const uint g_maxcol = sizeof(g_col)/sizeof(g_col[0]);
static const uint g_blobcols = 3;
static uint static uint
ncol() ncol()
{ {
uint n = g_maxcol; uint n = g_maxcol;
if (g_opts.no_blobs) if (g_opts.no_blobs)
n -= 2; n -= g_blobcols;
else if (g_opts.one_blob) else if (g_opts.one_blob)
n -= 1; n -= (g_blobcols - 1);
return n; return n;
} }
...@@ -278,6 +282,11 @@ createtable() ...@@ -278,6 +282,11 @@ createtable()
col.setStripeSize(g_blobstripesize); col.setStripeSize(g_blobstripesize);
col.setCharset(cs); col.setCharset(cs);
break; break;
case NdbDictionary::Column::Blob:
col.setInlineSize(g_blobinlinesize);
col.setPartSize(0);
col.setStripeSize(0);
break;
default: default:
assert(false); assert(false);
break; break;
...@@ -332,6 +341,7 @@ struct Data { ...@@ -332,6 +341,7 @@ struct Data {
char cc1[g_charlen + 1]; char cc1[g_charlen + 1];
Txt tx1; Txt tx1;
Txt tx2; Txt tx2;
Txt bl1;
Ptr ptr[g_maxcol]; Ptr ptr[g_maxcol];
int ind[g_maxcol]; // -1 = no data, 1 = NULL, 0 = not NULL int ind[g_maxcol]; // -1 = no data, 1 = NULL, 0 = not NULL
uint noop; // bit: omit in NdbOperation (implicit NULL INS or no UPD) uint noop; // bit: omit in NdbOperation (implicit NULL INS or no UPD)
...@@ -342,14 +352,15 @@ struct Data { ...@@ -342,14 +352,15 @@ struct Data {
memset(pk2, 0, sizeof(pk2)); memset(pk2, 0, sizeof(pk2));
seq = 0; seq = 0;
memset(cc1, 0, sizeof(cc1)); memset(cc1, 0, sizeof(cc1));
tx1.val = tx2.val = 0; tx1.val = tx2.val = bl1.val = 0;
tx1.len = tx2.len = 0; tx1.len = tx2.len = bl1.len = 0;
ptr[0].u32 = &pk1; ptr[0].u32 = &pk1;
ptr[1].ch = pk2; ptr[1].ch = pk2;
ptr[2].u32 = &seq; ptr[2].u32 = &seq;
ptr[3].ch = cc1; ptr[3].ch = cc1;
ptr[4].txt = &tx1; ptr[4].txt = &tx1;
ptr[5].txt = &tx2; ptr[5].txt = &tx2;
ptr[6].txt = &bl1;
for (i = 0; i < g_maxcol; i++) for (i = 0; i < g_maxcol; i++)
ind[i] = -1; ind[i] = -1;
noop = 0; noop = 0;
...@@ -358,6 +369,7 @@ struct Data { ...@@ -358,6 +369,7 @@ struct Data {
void free() { void free() {
delete [] tx1.val; delete [] tx1.val;
delete [] tx2.val; delete [] tx2.val;
delete [] bl1.val;
init(); init();
} }
}; };
...@@ -379,6 +391,7 @@ cmpcol(const Col& c, const Data& d1, const Data& d2) ...@@ -379,6 +391,7 @@ cmpcol(const Col& c, const Data& d1, const Data& d2)
return 1; return 1;
break; break;
case NdbDictionary::Column::Text: case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
{ {
const Data::Txt& t1 = *d1.ptr[i].txt; const Data::Txt& t1 = *d1.ptr[i].txt;
const Data::Txt& t2 = *d2.ptr[i].txt; const Data::Txt& t2 = *d2.ptr[i].txt;
...@@ -429,6 +442,7 @@ operator<<(NdbOut& out, const Data& d) ...@@ -429,6 +442,7 @@ operator<<(NdbOut& out, const Data& d)
} }
break; break;
case NdbDictionary::Column::Text: case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
{ {
Data::Txt& t = *d.ptr[i].txt; Data::Txt& t = *d.ptr[i].txt;
bool first = true; bool first = true;
...@@ -1071,19 +1085,21 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t) ...@@ -1071,19 +1085,21 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
} }
break; break;
case NdbDictionary::Column::Text: case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
{ {
const bool tinyblob = (c.type == NdbDictionary::Column::Blob);
Data::Txt& t = *d.ptr[i].txt; Data::Txt& t = *d.ptr[i].txt;
delete [] t.val; delete [] t.val;
t.val = 0; t.val = 0;
if (g_opts.tweak & 1) { if (g_opts.tweak & 1) {
uint u = 256 + 2000; uint u = g_blobinlinesize + (tinyblob ? 0 : g_blobpartsize);
uint v = (g_opts.tweak & 2) ? 0 : urandom(strlen(g_charval)); uint v = (g_opts.tweak & 2) ? 0 : urandom(strlen(g_charval));
t.val = new char [u]; t.val = new char [u];
t.len = u; t.len = u;
memset(t.val, g_charval[v], u); memset(t.val, g_charval[v], u);
break; break;
} }
uint u = urandom(g_maxblobsize); uint u = urandom(tinyblob ? g_blobinlinesize : g_maxblobsize);
u = urandom(u); // 4x bias for smaller blobs u = urandom(u); // 4x bias for smaller blobs
u = urandom(u); u = urandom(u);
t.val = new char [u]; t.val = new char [u];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment