Commit 5c0012cb authored by pekka@mysql.com's avatar pekka@mysql.com

ndb - bug#19201 (4.1), see comment in NdbBlob.cpp

parent b337e83d
...@@ -481,14 +481,22 @@ msg text NOT NULL ...@@ -481,14 +481,22 @@ msg text NOT NULL
insert into t1 (msg) values( insert into t1 (msg) values(
'Tries to validate (8 byte length + inline bytes) as UTF8 :( 'Tries to validate (8 byte length + inline bytes) as UTF8 :(
Fast fix: removed validation for Text. It is not yet indexable Fast fix: removed validation for Text. It is not yet indexable
so bad data will not crash kernel. so bad data will not crash kernel.');
Proper fix: Set inline bytes to multiple of mbmaxlen and
validate it (after the 8 byte length).');
select * from t1; select * from t1;
id msg id msg
1 Tries to validate (8 byte length + inline bytes) as UTF8 :( 1 Tries to validate (8 byte length + inline bytes) as UTF8 :(
Fast fix: removed validation for Text. It is not yet indexable Fast fix: removed validation for Text. It is not yet indexable
so bad data will not crash kernel. so bad data will not crash kernel.
Proper fix: Set inline bytes to multiple of mbmaxlen and drop table t1;
validate it (after the 8 byte length). create table t1 (
a int primary key not null auto_increment,
b text
) engine=ndbcluster;
select count(*) from t1;
count(*)
500
truncate t1;
select count(*) from t1;
count(*)
0
drop table t1; drop table t1;
...@@ -403,10 +403,29 @@ create table t1 ( ...@@ -403,10 +403,29 @@ create table t1 (
insert into t1 (msg) values( insert into t1 (msg) values(
'Tries to validate (8 byte length + inline bytes) as UTF8 :( 'Tries to validate (8 byte length + inline bytes) as UTF8 :(
Fast fix: removed validation for Text. It is not yet indexable Fast fix: removed validation for Text. It is not yet indexable
so bad data will not crash kernel. so bad data will not crash kernel.');
Proper fix: Set inline bytes to multiple of mbmaxlen and
validate it (after the 8 byte length).');
select * from t1; select * from t1;
drop table t1; drop table t1;
# -- bug #19201
create table t1 (
a int primary key not null auto_increment,
b text
) engine=ndbcluster;
--disable_query_log
set autocommit=1;
# more rows than batch size (64)
# for this bug no blob parts would be necessary
let $1 = 500;
while ($1)
{
insert into t1 (b) values (repeat('x',4000));
dec $1;
}
--enable_query_log
select count(*) from t1;
truncate t1;
select count(*) from t1;
drop table t1;
# End of 4.1 tests # End of 4.1 tests
...@@ -39,6 +39,7 @@ class TcKeyReq { ...@@ -39,6 +39,7 @@ class TcKeyReq {
friend class NdbOperation; friend class NdbOperation;
friend class NdbIndexOperation; friend class NdbIndexOperation;
friend class NdbScanOperation; friend class NdbScanOperation;
friend class NdbBlob;
friend class DbUtil; friend class DbUtil;
/** /**
......
...@@ -275,6 +275,7 @@ private: ...@@ -275,6 +275,7 @@ private:
bool isWriteOp(); bool isWriteOp();
bool isDeleteOp(); bool isDeleteOp();
bool isScanOp(); bool isScanOp();
bool isTakeOverOp();
// computations // computations
Uint32 getPartNumber(Uint64 pos); Uint32 getPartNumber(Uint64 pos);
Uint32 getPartCount(); Uint32 getPartCount();
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include <NdbBlob.hpp> #include <NdbBlob.hpp>
#include "NdbBlobImpl.hpp" #include "NdbBlobImpl.hpp"
#include <NdbScanOperation.hpp> #include <NdbScanOperation.hpp>
#include <signaldata/TcKeyReq.hpp>
#ifdef NDB_BLOB_DEBUG #ifdef NDB_BLOB_DEBUG
#define DBG(x) \ #define DBG(x) \
...@@ -290,6 +291,13 @@ NdbBlob::isScanOp() ...@@ -290,6 +291,13 @@ NdbBlob::isScanOp()
theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest; theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
} }
inline bool
NdbBlob::isTakeOverOp()
{
return
TcKeyReq::getTakeOverScanFlag(theNdbOp->theScanInfo);
}
// computations (inline) // computations (inline)
inline Uint32 inline Uint32
...@@ -1218,8 +1226,22 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch) ...@@ -1218,8 +1226,22 @@ NdbBlob::preExecute(ExecType anExecType, bool& batch)
if (isUpdateOp() || isWriteOp() || isDeleteOp()) { if (isUpdateOp() || isWriteOp() || isDeleteOp()) {
// add operation before this one to read head+inline // add operation before this one to read head+inline
NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp); NdbOperation* tOp = theNdbCon->getNdbOperation(theTable, theNdbOp);
/*
* If main op is from take over scan lock, the added read is done
* as committed read:
*
* In normal transactional case, the row is locked by us and
* committed read returns same as normal read.
*
* In current TRUNCATE TABLE, the deleting trans is committed in
* batches and then restarted with new trans id. A normal read
* would hang on the scan delete lock and then fail.
*/
NdbOperation::LockMode lockMode =
! isTakeOverOp() ?
NdbOperation::LM_Read : NdbOperation::LM_CommittedRead;
if (tOp == NULL || if (tOp == NULL ||
tOp->readTuple() == -1 || tOp->readTuple(lockMode) == -1 ||
setTableKeyValue(tOp) == -1 || setTableKeyValue(tOp) == -1 ||
getHeadInlineValue(tOp) == -1) { getHeadInlineValue(tOp) == -1) {
setErrorCode(tOp); setErrorCode(tOp);
......
...@@ -45,6 +45,7 @@ struct Opt { ...@@ -45,6 +45,7 @@ struct Opt {
bool m_dbg; bool m_dbg;
bool m_dbgall; bool m_dbgall;
const char* m_dbug; const char* m_dbug;
bool m_fac;
bool m_full; bool m_full;
unsigned m_loop; unsigned m_loop;
unsigned m_parts; unsigned m_parts;
...@@ -73,6 +74,7 @@ struct Opt { ...@@ -73,6 +74,7 @@ struct Opt {
m_dbg(false), m_dbg(false),
m_dbgall(false), m_dbgall(false),
m_dbug(0), m_dbug(0),
m_fac(false),
m_full(false), m_full(false),
m_loop(1), m_loop(1),
m_parts(10), m_parts(10),
...@@ -111,6 +113,7 @@ printusage() ...@@ -111,6 +113,7 @@ printusage()
<< " -dbg print debug" << endl << " -dbg print debug" << endl
<< " -dbgall print also NDB API debug (if compiled in)" << endl << " -dbgall print also NDB API debug (if compiled in)" << endl
<< " -dbug opt dbug options" << endl << " -dbug opt dbug options" << endl
<< " -fac fetch across commit in scan delete [" << d.m_fac << "]" << endl
<< " -full read/write only full blob values" << endl << " -full read/write only full blob values" << endl
<< " -loop N loop N times 0=forever [" << d.m_loop << "]" << endl << " -loop N loop N times 0=forever [" << d.m_loop << "]" << endl
<< " -parts N max parts in blob value [" << d.m_parts << "]" << endl << " -parts N max parts in blob value [" << d.m_parts << "]" << endl
...@@ -1260,23 +1263,11 @@ deleteScan(bool idx) ...@@ -1260,23 +1263,11 @@ deleteScan(bool idx)
CHK((ret = rs->nextResult(false)) == 0 || ret == 1 || ret == 2); CHK((ret = rs->nextResult(false)) == 0 || ret == 1 || ret == 2);
if (++n == g_opt.m_batch || ret == 2) { if (++n == g_opt.m_batch || ret == 2) {
DBG("execute batch: n=" << n << " ret=" << ret); DBG("execute batch: n=" << n << " ret=" << ret);
switch (0) { if (! g_opt.m_fac) {
case 0: // works normally
CHK(g_con->execute(NoCommit) == 0); CHK(g_con->execute(NoCommit) == 0);
CHK(true || g_con->restart() == 0); } else {
break;
case 1: // nonsense - g_con is invalid for 2nd batch
CHK(g_con->execute(Commit) == 0);
CHK(true || g_con->restart() == 0);
break;
case 2: // DBTC sendSignalErrorRefuseLab
CHK(g_con->execute(NoCommit) == 0);
CHK(g_con->restart() == 0);
break;
case 3: // 266 time-out
CHK(g_con->execute(Commit) == 0); CHK(g_con->execute(Commit) == 0);
CHK(g_con->restart() == 0); CHK(g_con->restart() == 0);
break;
} }
n = 0; n = 0;
} }
...@@ -1824,6 +1815,10 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535) ...@@ -1824,6 +1815,10 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
continue; continue;
} }
} }
if (strcmp(arg, "-fac") == 0) {
g_opt.m_fac = true;
continue;
}
if (strcmp(arg, "-full") == 0) { if (strcmp(arg, "-full") == 0) {
g_opt.m_full = true; g_opt.m_full = true;
continue; continue;
......
...@@ -23,17 +23,21 @@ ...@@ -23,17 +23,21 @@
#include <NDBT.hpp> #include <NDBT.hpp>
static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, static int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
bool commit_across_open_cursor, int parallelism=240); bool fetch_across_commit, int parallelism=240);
NDB_STD_OPTS_VARS; NDB_STD_OPTS_VARS;
static const char* _dbname = "TEST_DB"; static const char* _dbname = "TEST_DB";
static my_bool _transactional = false;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
NDB_STD_OPTS("ndb_desc"), NDB_STD_OPTS("ndb_desc"),
{ "database", 'd', "Name of database table is in", { "database", 'd', "Name of database table is in",
(gptr*) &_dbname, (gptr*) &_dbname, 0, (gptr*) &_dbname, (gptr*) &_dbname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "transactional", 't', "Single transaction (may run out of operations)",
(gptr*) &_transactional, (gptr*) &_transactional, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
static void usage() static void usage()
...@@ -82,18 +86,11 @@ int main(int argc, char** argv){ ...@@ -82,18 +86,11 @@ int main(int argc, char** argv){
ndbout << " Table " << argv[i] << " does not exist!" << endl; ndbout << " Table " << argv[i] << " does not exist!" << endl;
return NDBT_ProgramExit(NDBT_WRONGARGS); return NDBT_ProgramExit(NDBT_WRONGARGS);
} }
// Check if we have any blobs ndbout << "Deleting all from " << argv[i];
bool commit_across_open_cursor = true; if (! _transactional)
for (int j = 0; j < pTab->getNoOfColumns(); j++) { ndbout << " (non-transactional)";
NdbDictionary::Column::Type t = pTab->getColumn(j)->getType(); ndbout << " ...";
if (t == NdbDictionary::Column::Blob || if(clear_table(&MyNdb, pTab, ! _transactional) == NDBT_FAILED){
t == NdbDictionary::Column::Text) {
commit_across_open_cursor = false;
break;
}
}
ndbout << "Deleting all from " << argv[i] << "...";
if(clear_table(&MyNdb, pTab, commit_across_open_cursor) == NDBT_FAILED){
res = NDBT_FAILED; res = NDBT_FAILED;
ndbout << "FAILED" << endl; ndbout << "FAILED" << endl;
} }
...@@ -103,7 +100,7 @@ int main(int argc, char** argv){ ...@@ -103,7 +100,7 @@ int main(int argc, char** argv){
int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
bool commit_across_open_cursor, int parallelism) bool fetch_across_commit, int parallelism)
{ {
// Scan all records exclusive and delete // Scan all records exclusive and delete
// them one by one // them one by one
...@@ -134,7 +131,7 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, ...@@ -134,7 +131,7 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
} }
goto failed; goto failed;
} }
pOp = pTrans->getNdbScanOperation(pTab->getName()); pOp = pTrans->getNdbScanOperation(pTab->getName());
if (pOp == NULL) { if (pOp == NULL) {
goto failed; goto failed;
...@@ -165,7 +162,7 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, ...@@ -165,7 +162,7 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
} while((check = rs->nextResult(false)) == 0); } while((check = rs->nextResult(false)) == 0);
if(check != -1){ if(check != -1){
if (commit_across_open_cursor) { if (fetch_across_commit) {
check = pTrans->execute(Commit); check = pTrans->execute(Commit);
pTrans->restart(); // new tx id pTrans->restart(); // new tx id
} else { } else {
...@@ -196,7 +193,7 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab, ...@@ -196,7 +193,7 @@ int clear_table(Ndb* pNdb, const NdbDictionary::Table* pTab,
} }
goto failed; goto failed;
} }
if (! commit_across_open_cursor && pTrans->execute(Commit) != 0) { if (! fetch_across_commit && pTrans->execute(Commit) != 0) {
err = pTrans->getNdbError(); err = pTrans->getNdbError();
goto failed; goto failed;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment