restore of auto increment bug#5786

parent 77582df9
...@@ -88,7 +88,7 @@ RestoreMetaData::~RestoreMetaData(){ ...@@ -88,7 +88,7 @@ RestoreMetaData::~RestoreMetaData(){
allTables.clear(); allTables.clear();
} }
const TableS * TableS *
RestoreMetaData::getTable(Uint32 tableId) const { RestoreMetaData::getTable(Uint32 tableId) const {
for(Uint32 i= 0; i < allTables.size(); i++) for(Uint32 i= 0; i < allTables.size(); i++)
if(allTables[i]->getTableId() == tableId) if(allTables[i]->getTableId() == tableId)
...@@ -201,6 +201,8 @@ TableS::TableS(NdbTableImpl* tableImpl) ...@@ -201,6 +201,8 @@ TableS::TableS(NdbTableImpl* tableImpl)
{ {
m_dictTable = tableImpl; m_dictTable = tableImpl;
m_noOfNullable = m_nullBitmaskSize = 0; m_noOfNullable = m_nullBitmaskSize = 0;
m_auto_val_id= ~(Uint32)0;
m_max_auto_val= 0;
for (int i = 0; i < tableImpl->getNoOfColumns(); i++) for (int i = 0; i < tableImpl->getNoOfColumns(); i++)
createAttr(tableImpl->getColumn(i)); createAttr(tableImpl->getColumn(i));
...@@ -269,7 +271,7 @@ int TupleS::getNoOfAttributes() const { ...@@ -269,7 +271,7 @@ int TupleS::getNoOfAttributes() const {
return m_currentTable->getNoOfAttributes(); return m_currentTable->getNoOfAttributes();
}; };
const TableS * TupleS::getTable() const { TableS * TupleS::getTable() const {
return m_currentTable; return m_currentTable;
}; };
...@@ -282,7 +284,7 @@ AttributeData * TupleS::getData(int i) const{ ...@@ -282,7 +284,7 @@ AttributeData * TupleS::getData(int i) const{
}; };
bool bool
TupleS::prepareRecord(const TableS & tab){ TupleS::prepareRecord(TableS & tab){
if (allAttrData) { if (allAttrData) {
if (getNoOfAttributes() == tab.getNoOfAttributes()) if (getNoOfAttributes() == tab.getNoOfAttributes())
{ {
...@@ -698,6 +700,9 @@ void TableS::createAttr(NdbDictionary::Column *column) ...@@ -698,6 +700,9 @@ void TableS::createAttr(NdbDictionary::Column *column)
d->attrId = allAttributesDesc.size(); d->attrId = allAttributesDesc.size();
allAttributesDesc.push_back(d); allAttributesDesc.push_back(d);
if (d->m_column->getAutoIncrement())
m_auto_val_id= d->attrId;
if(d->m_column->getPrimaryKey() /* && not variable */) if(d->m_column->getPrimaryKey() /* && not variable */)
{ {
m_fixedKeys.push_back(d); m_fixedKeys.push_back(d);
......
...@@ -91,9 +91,9 @@ class TupleS { ...@@ -91,9 +91,9 @@ class TupleS {
private: private:
friend class RestoreDataIterator; friend class RestoreDataIterator;
const TableS *m_currentTable; TableS *m_currentTable;
AttributeData *allAttrData; AttributeData *allAttrData;
bool prepareRecord(const TableS &); bool prepareRecord(TableS &);
public: public:
TupleS() { TupleS() {
...@@ -108,7 +108,7 @@ public: ...@@ -108,7 +108,7 @@ public:
TupleS(const TupleS& tuple); // disable copy constructor TupleS(const TupleS& tuple); // disable copy constructor
TupleS & operator=(const TupleS& tuple); TupleS & operator=(const TupleS& tuple);
int getNoOfAttributes() const; int getNoOfAttributes() const;
const TableS * getTable() const; TableS * getTable() const;
const AttributeDesc * getDesc(int i) const; const AttributeDesc * getDesc(int i) const;
AttributeData * getData(int i) const; AttributeData * getData(int i) const;
}; // class TupleS }; // class TupleS
...@@ -130,6 +130,9 @@ class TableS { ...@@ -130,6 +130,9 @@ class TableS {
Uint32 m_noOfNullable; Uint32 m_noOfNullable;
Uint32 m_nullBitmaskSize; Uint32 m_nullBitmaskSize;
Uint32 m_auto_val_id;
Uint64 m_max_auto_val;
int pos; int pos;
void createAttr(NdbDictionary::Column *column); void createAttr(NdbDictionary::Column *column);
...@@ -170,6 +173,42 @@ public: ...@@ -170,6 +173,42 @@ public:
return allAttributesDesc.size(); return allAttributesDesc.size();
}; };
bool have_auto_inc() const {
return m_auto_val_id != ~(Uint32)0;
};
bool have_auto_inc(Uint32 id) const {
return m_auto_val_id == id;
};
Uint64 get_max_auto_val() const {
return m_max_auto_val;
};
void update_max_auto_val(const char *data, int size) {
Uint64 val= 0;
switch(size){
case 8:
val= *(Uint8*)data;
break;
case 16:
val= *(Uint16*)data;
break;
case 24:
val= (0xffffff)&*(Uint32*)data;
break;
case 32:
val= *(Uint32*)data;
break;
case 64:
val= *(Uint64*)data;
break;
default:
return;
};
if(val > m_max_auto_val)
m_max_auto_val= val;
};
/** /**
* Get attribute descriptor * Get attribute descriptor
*/ */
...@@ -245,7 +284,7 @@ public: ...@@ -245,7 +284,7 @@ public:
Uint32 getNoOfTables() const { return allTables.size();} Uint32 getNoOfTables() const { return allTables.size();}
const TableS * operator[](int i) const { return allTables[i];} const TableS * operator[](int i) const { return allTables[i];}
const TableS * getTable(Uint32 tableId) const; TableS * getTable(Uint32 tableId) const;
Uint32 getStopGCP() const; Uint32 getStopGCP() const;
}; // RestoreMetaData }; // RestoreMetaData
...@@ -254,7 +293,7 @@ public: ...@@ -254,7 +293,7 @@ public:
class RestoreDataIterator : public BackupFile { class RestoreDataIterator : public BackupFile {
const RestoreMetaData & m_metaData; const RestoreMetaData & m_metaData;
Uint32 m_count; Uint32 m_count;
const TableS* m_currentTable; TableS* m_currentTable;
TupleS m_tuple; TupleS m_tuple;
public: public:
...@@ -278,7 +317,7 @@ public: ...@@ -278,7 +317,7 @@ public:
LE_UPDATE LE_UPDATE
}; };
EntryType m_type; EntryType m_type;
const TableS * m_table; TableS * m_table;
Vector<AttributeS*> m_values; Vector<AttributeS*> m_values;
Vector<AttributeS*> m_values_e; Vector<AttributeS*> m_values_e;
AttributeS *add_attr() { AttributeS *add_attr() {
......
...@@ -30,6 +30,7 @@ public: ...@@ -30,6 +30,7 @@ public:
virtual void endOfTuples(){} virtual void endOfTuples(){}
virtual void logEntry(const LogEntry &){} virtual void logEntry(const LogEntry &){}
virtual void endOfLogEntrys(){} virtual void endOfLogEntrys(){}
virtual bool finalize_table(const TableS &){return true;}
}; };
#endif #endif
...@@ -130,6 +130,21 @@ BackupRestore::get_table(const NdbDictionary::Table* tab){ ...@@ -130,6 +130,21 @@ BackupRestore::get_table(const NdbDictionary::Table* tab){
return m_cache.m_new_table; return m_cache.m_new_table;
} }
bool
BackupRestore::finalize_table(const TableS & table){
bool ret= true;
if (!m_restore && !m_restore_meta)
return ret;
if (table.have_auto_inc())
{
Uint64 max_val= table.get_max_auto_val();
Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable));
if (max_val+1 > auto_val || auto_val == ~(Uint64)0)
ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false);
}
return ret;
}
bool bool
BackupRestore::table(const TableS & table){ BackupRestore::table(const TableS & table){
if (!m_restore && !m_restore_meta) if (!m_restore && !m_restore_meta)
...@@ -179,6 +194,9 @@ BackupRestore::table(const TableS & table){ ...@@ -179,6 +194,9 @@ BackupRestore::table(const TableS & table){
err << "Unable to find table: " << split[2].c_str() << endl; err << "Unable to find table: " << split[2].c_str() << endl;
return false; return false;
} }
if(m_restore_meta){
m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false);
}
const NdbDictionary::Table* null = 0; const NdbDictionary::Table* null = 0;
m_new_tables.fill(table.m_dictTable->getTableId(), null); m_new_tables.fill(table.m_dictTable->getTableId(), null);
m_new_tables[table.m_dictTable->getTableId()] = tab; m_new_tables[table.m_dictTable->getTableId()] = tab;
...@@ -316,6 +334,10 @@ void BackupRestore::tuple_a(restore_callback_t *cb) ...@@ -316,6 +334,10 @@ void BackupRestore::tuple_a(restore_callback_t *cb)
int arraySize = attr_desc->arraySize; int arraySize = attr_desc->arraySize;
char * dataPtr = attr_data->string_value; char * dataPtr = attr_data->string_value;
Uint32 length = (size * arraySize) / 8; Uint32 length = (size * arraySize) / 8;
if (j == 0 && tup.getTable()->have_auto_inc(i))
tup.getTable()->update_max_auto_val(dataPtr,size);
if (attr_desc->m_column->getPrimaryKey()) if (attr_desc->m_column->getPrimaryKey())
{ {
if (j == 1) continue; if (j == 1) continue;
...@@ -510,8 +532,11 @@ BackupRestore::logEntry(const LogEntry & tup) ...@@ -510,8 +532,11 @@ BackupRestore::logEntry(const LogEntry & tup)
int arraySize = attr->Desc->arraySize; int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value; const char * dataPtr = attr->Data.string_value;
if (tup.m_table->have_auto_inc(attr->Desc->attrId))
tup.m_table->update_max_auto_val(dataPtr,size);
const Uint32 length = (size / 8) * arraySize; const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey()) if (attr->Desc->m_column->getPrimaryKey())
op->equal(attr->Desc->attrId, dataPtr, length); op->equal(attr->Desc->attrId, dataPtr, length);
else else
op->setValue(attr->Desc->attrId, dataPtr, length); op->setValue(attr->Desc->attrId, dataPtr, length);
......
...@@ -59,6 +59,7 @@ public: ...@@ -59,6 +59,7 @@ public:
virtual void endOfTuples(); virtual void endOfTuples();
virtual void logEntry(const LogEntry &); virtual void logEntry(const LogEntry &);
virtual void endOfLogEntrys(); virtual void endOfLogEntrys();
virtual bool finalize_table(const TableS &);
void connectToMysql(); void connectToMysql();
Ndb * m_ndb; Ndb * m_ndb;
bool m_restore; bool m_restore;
......
...@@ -355,6 +355,20 @@ main(int argc, const char** argv) ...@@ -355,6 +355,20 @@ main(int argc, const char** argv)
logIter.validateFooter(); //not implemented logIter.validateFooter(); //not implemented
for (i= 0; i < g_consumers.size(); i++) for (i= 0; i < g_consumers.size(); i++)
g_consumers[i]->endOfLogEntrys(); g_consumers[i]->endOfLogEntrys();
for(i = 0; i<metaData.getNoOfTables(); i++)
{
if (checkSysTable(metaData[i]->getTableName()))
{
for(Uint32 j= 0; j < g_consumers.size(); j++)
if (!g_consumers[j]->finalize_table(* metaData[i]))
{
ndbout_c("Restore: Failed to finalize restore table: %s. "
"Exiting...",
metaData[i]->getTableName());
return -11;
}
}
}
} }
} }
clearConsumers(); clearConsumers();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment