Commit ed0c2955 authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb

into  poseidon.mysql.com:/home/tomas/mysql-5.1-new-ndb

parents bea6fb17 867e6dc6
......@@ -215,8 +215,6 @@ NdbEventOperationImpl::~NdbEventOperationImpl()
DBUG_VOID_RETURN;
stop();
// m_bufferHandle->dropSubscribeEvent(m_bufferId);
; // ToDo? We should send stop signal here
if (theMainOp == NULL)
{
......@@ -428,7 +426,7 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
// create blob event operation
tBlobOp =
m_ndb->theEventBuffer->createEventOperation(*blobEvnt, m_error);
m_ndb->theEventBuffer->createEventOperationImpl(*blobEvnt, m_error);
if (tBlobOp == NULL)
DBUG_RETURN(NULL);
......@@ -561,6 +559,10 @@ NdbEventOperationImpl::execute_nolock()
m_state= EO_EXECUTING;
mi_type= m_eventImpl->mi_type;
m_ndb->theEventBuffer->add_op();
// add kernel reference
// removed on TE_STOP, TE_CLUSTER_FAILURE, or error below
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
int r= NdbDictionaryImpl::getImpl(*myDict).executeSubscribeEvent(*this);
if (r == 0) {
if (theMainOp == NULL) {
......@@ -568,19 +570,31 @@ NdbEventOperationImpl::execute_nolock()
NdbEventOperationImpl* blob_op = theBlobOpList;
while (blob_op != NULL) {
r = blob_op->execute_nolock();
if (r != 0)
break;
if (r != 0) {
// since main op is running and possibly some blob ops as well
// we can't just reset the main op. Instead return with error,
// main op (and blob ops) will be cleaned up when user calls
// dropEventOperation
m_error.code= myDict->getNdbError().code;
DBUG_RETURN(r);
}
// add blob reference to main op
// removed by TE_STOP or TE_CLUSTER_FAILURE
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
blob_op = blob_op->m_next;
}
}
if (r == 0)
{
m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
DBUG_RETURN(0);
}
}
//Error
// Error
// remove kernel reference
// added above
m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", m_ref_count, this));
m_state= EO_ERROR;
mi_type= 0;
m_magic_number= 0;
......@@ -1222,6 +1236,8 @@ NdbEventBuffer::nextEvent()
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
while (gci_ops && op->getGCI() > gci_ops->m_gci)
{
// moved to next gci, check if any references have been
// released when completing the last gci
deleteUsedEventOperations();
gci_ops = m_available_data.next_gci_ops();
}
......@@ -1249,6 +1265,8 @@ NdbEventBuffer::nextEvent()
#endif
// free all "per gci unique" collected operations
// completed gci, check if any references have been
// released when completing the gci
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
while (gci_ops)
{
......@@ -1285,6 +1303,8 @@ NdbEventBuffer::deleteUsedEventOperations()
{
NdbEventOperationImpl *op = &op_f->m_impl;
DBUG_ASSERT(op->m_ref_count > 0);
// remove gci reference
// added in inserDataL
op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
if (op->m_ref_count == 0)
......@@ -1582,6 +1602,33 @@ NdbEventBuffer::complete_outof_order_gcis()
ndbout_c("complete_outof_order_gcis: m_latestGCI: %lld", m_latestGCI);
}
void
NdbEventBuffer::insert_event(NdbEventOperationImpl* impl,
SubTableData &data,
LinearSectionPtr *ptr,
Uint32 &oid_ref)
{
NdbEventOperationImpl *dropped_ev_op = m_dropped_ev_op;
do
{
do
{
oid_ref = impl->m_oid;
insertDataL(impl, &data, ptr);
NdbEventOperationImpl* blob_op = impl->theBlobOpList;
while (blob_op != NULL)
{
oid_ref = blob_op->m_oid;
insertDataL(blob_op, &data, ptr);
blob_op = blob_op->m_next;
}
} while((impl = impl->m_next));
impl = dropped_ev_op;
dropped_ev_op = NULL;
} while (impl);
}
void
NdbEventBuffer::report_node_connected(Uint32 node_id)
{
......@@ -1606,21 +1653,8 @@ NdbEventBuffer::report_node_connected(Uint32 node_id)
/**
* Insert this event for each operation
*/
{
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
// no need to lock()/unlock(), receive thread calls this
insert_event(&op->m_impl, data, ptr, data.senderData);
DBUG_VOID_RETURN;
}
......@@ -1648,21 +1682,8 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
/**
* Insert this event for each operation
*/
{
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
// no need to lock()/unlock(), receive thread calls this
insert_event(&op->m_impl, data, ptr, data.senderData);
DBUG_VOID_RETURN;
}
......@@ -1693,21 +1714,8 @@ NdbEventBuffer::completeClusterFailed()
/**
* Insert this event for each operation
*/
{
// no need to lock()/unlock(), receive thread calls this
NdbEventOperationImpl* impl = &op->m_impl;
do if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
} while((impl = impl->m_next));
for (impl = m_dropped_ev_op; impl; impl = impl->m_next)
if (!impl->m_node_bit_mask.isclear())
{
data.senderData = impl->m_oid;
insertDataL(impl, &data, ptr);
}
}
// no need to lock()/unlock(), receive thread calls this
insert_event(&op->m_impl, data, ptr, data.senderData);
/**
* Release all GCI's with m_gci > gci
......@@ -1797,25 +1805,60 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
{
case NdbDictionary::Event::_TE_NODE_FAILURE:
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
DBUG_PRINT("info",
("_TE_NODE_FAILURE: m_ref_count: %u for op: %p id: %u",
op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
break;
case NdbDictionary::Event::_TE_ACTIVE:
op->m_node_bit_mask.set(SubTableData::getNdbdNodeId(ri));
// internal event, do not relay to user
DBUG_PRINT("info",
("_TE_ACTIVE: m_ref_count: %u for op: %p id: %u",
op->m_ref_count, op, SubTableData::getNdbdNodeId(ri)));
DBUG_RETURN_EVENT(0);
break;
case NdbDictionary::Event::_TE_CLUSTER_FAILURE:
op->m_node_bit_mask.clear();
DBUG_ASSERT(op->m_ref_count > 0);
op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
if (!op->m_node_bit_mask.isclear())
{
op->m_node_bit_mask.clear();
DBUG_ASSERT(op->m_ref_count > 0);
// remove kernel reference
// added in execute_nolock
op->m_ref_count--;
DBUG_PRINT("info", ("_TE_CLUSTER_FAILURE: m_ref_count: %u for op: %p",
op->m_ref_count, op));
if (op->theMainOp)
{
DBUG_ASSERT(op->m_ref_count == 0);
DBUG_ASSERT(op->theMainOp->m_ref_count > 0);
// remove blob reference in main op
// added in execute_no_lock
op->theMainOp->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
op->theMainOp->m_ref_count, op->theMainOp));
}
}
break;
case NdbDictionary::Event::_TE_STOP:
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
if (op->m_node_bit_mask.isclear())
{
DBUG_ASSERT(op->m_ref_count > 0);
// remove kernel reference
// added in execute_no_lock
op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
DBUG_PRINT("info", ("_TE_STOP: m_ref_count: %u for op: %p",
op->m_ref_count, op));
if (op->theMainOp)
{
DBUG_ASSERT(op->m_ref_count == 0);
DBUG_ASSERT(op->theMainOp->m_ref_count > 0);
// remove blob reference in main op
// added in execute_no_lock
op->theMainOp->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
op->theMainOp->m_ref_count, op->theMainOp));
}
}
break;
default:
......@@ -2564,6 +2607,8 @@ EventBufData_list::add_gci_op(Gci_op g)
#ifndef DBUG_OFF
i = m_gci_op_count;
#endif
// add gci reference
// removed in deleteUsedOperations
g.op->m_ref_count++;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", g.op->m_ref_count, g.op));
m_gci_op_list[m_gci_op_count++] = g;
......@@ -2632,6 +2677,8 @@ NdbEventBuffer::createEventOperation(const char* eventName,
delete tOp;
DBUG_RETURN(NULL);
}
// add user reference
// removed in dropEventOperation
getEventOperationImpl(tOp)->m_ref_count = 1;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p",
getEventOperationImpl(tOp)->m_ref_count, getEventOperationImpl(tOp)));
......@@ -2639,10 +2686,10 @@ NdbEventBuffer::createEventOperation(const char* eventName,
}
NdbEventOperationImpl*
NdbEventBuffer::createEventOperation(NdbEventImpl& evnt,
NdbError &theError)
NdbEventBuffer::createEventOperationImpl(NdbEventImpl& evnt,
NdbError &theError)
{
DBUG_ENTER("NdbEventBuffer::createEventOperation [evnt]");
DBUG_ENTER("NdbEventBuffer::createEventOperationImpl");
NdbEventOperationImpl* tOp= new NdbEventOperationImpl(m_ndb, evnt);
if (tOp == 0)
{
......@@ -2684,6 +2731,9 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
}
DBUG_ASSERT(op->m_ref_count > 0);
// remove user reference
// added in createEventOperation
// user error to use reference after this
op->m_ref_count--;
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
if (op->m_ref_count == 0)
......
......@@ -400,7 +400,59 @@ public:
Uint32 m_eventId;
Uint32 m_oid;
/*
m_node_bit_mask keeps track of which ndb nodes have reference to
an event op
- add - TE_ACTIVE
- remove - TE_STOP, TE_NODE_FAILURE, TE_CLUSTER_FAILURE
TE_NODE_FAILURE and TE_CLUSTER_FAILURE are created as events
and added to all event ops listed as active or pending delete
in m_dropped_ev_op using insertDataL, includeing the blob
event ops referenced by a regular event op.
- NdbEventBuffer::report_node_failure
- NdbEventBuffer::completeClusterFailed
TE_ACTIVE is sent from the kernel on initial execute/start of the
event op, but is also internally generetad on node connect like
TE_NODE_FAILURE and TE_CLUSTER_FAILURE
- NdbEventBuffer::report_node_connected
when m_node_bit_mask becomes clear, the kernel reference is
removed from m_ref_count
*/
Bitmask<(unsigned int)_NDB_NODE_BITMASK_SIZE> m_node_bit_mask;
/*
m_ref_count keeps track of outstanding references to an event
operation impl object. To make sure that the object is not
deleted too early.
If on dropEventOperation there are still references to an
object it is queued for delete in NdbEventBuffer::m_dropped_ev_op
the following references exists for a _non_ blob event op:
* user reference
- add - NdbEventBuffer::createEventOperation
- remove - NdbEventBuffer::dropEventOperation
* kernel reference
- add - execute_nolock
- remove - TE_STOP, TE_CLUSTER_FAILURE
* blob reference
- add - execute_nolock on blob event
- remove - TE_STOP, TE_CLUSTER_FAILURE on blob event
* gci reference
- add - insertDataL/add_gci_op
- remove - NdbEventBuffer::deleteUsedEventOperations
the following references exists for a blob event op:
* kernel reference
- add - execute_nolock
- remove - TE_STOP, TE_CLUSTER_FAILURE
*/
int m_ref_count;
bool m_mergeEvents;
......@@ -436,8 +488,8 @@ public:
Vector<Gci_container_pod> m_active_gci;
NdbEventOperation *createEventOperation(const char* eventName,
NdbError &);
NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt,
NdbError &);
NdbEventOperationImpl *createEventOperationImpl(NdbEventImpl& evnt,
NdbError &);
void dropEventOperation(NdbEventOperation *);
static NdbEventOperationImpl* getEventOperationImpl(NdbEventOperation* tOp);
......@@ -541,6 +593,11 @@ public:
#endif
private:
void insert_event(NdbEventOperationImpl* impl,
SubTableData &data,
LinearSectionPtr *ptr,
Uint32 &oid_ref);
int expand(unsigned sz);
// all allocated data
......@@ -552,8 +609,14 @@ private:
Vector<EventBufData_chunk *> m_allocated_data;
unsigned m_sz;
// dropped event operations that have not yet
// been deleted
/*
dropped event operations (dropEventOperation) that have not yet
been deleted because of outstanding m_ref_count
check for delete is done on occations when the ref_count may have
changed by calling deleteUsedEventOperations:
- nextEvent - each time the user has completed processing a gci
*/
NdbEventOperationImpl *m_dropped_ev_op;
Uint32 m_active_op_count;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment