Commit 60d3e60e authored by joreland@mysql.com's avatar joreland@mysql.com

ndb - Embryo of overload protection

  Add method to query free send buffer size
parent 5189e9dd
...@@ -179,6 +179,13 @@ public: ...@@ -179,6 +179,13 @@ public:
bool createTransporter(struct SHM_TransporterConfiguration * config); bool createTransporter(struct SHM_TransporterConfiguration * config);
bool createTransporter(struct OSE_TransporterConfiguration * config); bool createTransporter(struct OSE_TransporterConfiguration * config);
/**
* Get free buffer space
*
* Get #free bytes in send buffer for <em>node</node>
*/
Uint32 get_free_buffer(Uint32 node) const ;
/** /**
* prepareSend * prepareSend
* *
......
...@@ -1023,7 +1023,8 @@ SCI_Transporter::initSCI() { ...@@ -1023,7 +1023,8 @@ SCI_Transporter::initSCI() {
DBUG_RETURN(true); DBUG_RETURN(true);
} }
Uint32
SCI_Transporter::get_free_buffer() const
{
return (m_TargetSegm[m_ActiveAdapterId].writer)->get_free_buffer();
}
...@@ -134,6 +134,7 @@ public: ...@@ -134,6 +134,7 @@ public:
*/ */
bool getConnectionStatus(); bool getConnectionStatus();
virtual Uint32 get_free_buffer() const;
private: private:
SCI_Transporter(TransporterRegistry &t_reg, SCI_Transporter(TransporterRegistry &t_reg,
const char *local_host, const char *local_host,
......
...@@ -157,6 +157,7 @@ public: ...@@ -157,6 +157,7 @@ public:
inline Uint32 getWriteIndex() const { return m_writeIndex;} inline Uint32 getWriteIndex() const { return m_writeIndex;}
inline Uint32 getBufferSize() const { return m_bufferSize;} inline Uint32 getBufferSize() const { return m_bufferSize;}
inline Uint32 get_free_buffer() const;
inline void copyIndexes(SHM_Writer * standbyWriter); inline void copyIndexes(SHM_Writer * standbyWriter);
...@@ -213,4 +214,20 @@ SHM_Writer::updateWritePtr(Uint32 sz){ ...@@ -213,4 +214,20 @@ SHM_Writer::updateWritePtr(Uint32 sz){
* m_sharedWriteIndex = tWriteIndex; * m_sharedWriteIndex = tWriteIndex;
} }
inline
Uint32
SHM_Writer::get_free_buffer() const
{
Uint32 tReadIndex = * m_sharedReadIndex;
Uint32 tWriteIndex = m_writeIndex;
Uint32 free;
if(tReadIndex <= tWriteIndex){
free = m_bufferSize + tReadIndex - tWriteIndex;
} else {
free = tReadIndex - tWriteIndex;
}
return free;
}
#endif #endif
...@@ -362,3 +362,9 @@ SHM_Transporter::doSend() ...@@ -362,3 +362,9 @@ SHM_Transporter::doSend()
kill(m_remote_pid, g_ndb_shm_signum); kill(m_remote_pid, g_ndb_shm_signum);
} }
} }
Uint32
SHM_Transporter::get_free_buffer() const
{
return writer->get_free_buffer();
}
...@@ -138,6 +138,8 @@ protected: ...@@ -138,6 +138,8 @@ protected:
Uint32 m_last_signal; Uint32 m_last_signal;
Uint32 m_signal_threshold; Uint32 m_signal_threshold;
virtual Uint32 get_free_buffer() const;
private: private:
bool _shmSegCreated; bool _shmSegCreated;
bool _attached; bool _attached;
......
...@@ -60,7 +60,7 @@ SendBuffer::bufferSize() { ...@@ -60,7 +60,7 @@ SendBuffer::bufferSize() {
} }
Uint32 Uint32
SendBuffer::bufferSizeRemaining() { SendBuffer::bufferSizeRemaining() const {
return (sizeOfBuffer - dataSize); return (sizeOfBuffer - dataSize);
} }
......
...@@ -51,7 +51,7 @@ public: ...@@ -51,7 +51,7 @@ public:
bool initBuffer(Uint32 aRemoteNodeId); bool initBuffer(Uint32 aRemoteNodeId);
// Number of bytes remaining in the buffer // Number of bytes remaining in the buffer
Uint32 bufferSizeRemaining(); Uint32 bufferSizeRemaining() const;
// Number of bytes of data in the buffer // Number of bytes of data in the buffer
int bufferSize(); int bufferSize();
......
...@@ -250,6 +250,11 @@ TCP_Transporter::sendIsPossible(struct timeval * timeout) { ...@@ -250,6 +250,11 @@ TCP_Transporter::sendIsPossible(struct timeval * timeout) {
#endif #endif
} }
Uint32
TCP_Transporter::get_free_buffer() const
{
return m_sendBuffer.bufferSizeRemaining();
}
Uint32 * Uint32 *
TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){ TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
......
...@@ -99,6 +99,7 @@ private: ...@@ -99,6 +99,7 @@ private:
*/ */
virtual void updateReceiveDataPtr(Uint32 bytesRead); virtual void updateReceiveDataPtr(Uint32 bytesRead);
virtual Uint32 get_free_buffer() const;
protected: protected:
/** /**
* Setup client/server and perform connect/accept * Setup client/server and perform connect/accept
......
...@@ -69,6 +69,8 @@ public: ...@@ -69,6 +69,8 @@ public:
*/ */
NodeId getLocalNodeId() const; NodeId getLocalNodeId() const;
virtual Uint32 get_free_buffer() const = 0;
protected: protected:
Transporter(TransporterRegistry &, Transporter(TransporterRegistry &,
TransporterType, TransporterType,
......
...@@ -523,6 +523,18 @@ TransporterRegistry::removeTransporter(NodeId nodeId) { ...@@ -523,6 +523,18 @@ TransporterRegistry::removeTransporter(NodeId nodeId) {
theTransporters[nodeId] = NULL; theTransporters[nodeId] = NULL;
} }
Uint32
TransporterRegistry::get_free_buffer(Uint32 node) const
{
Transporter *t;
if(likely((t = theTransporters[node]) != 0))
{
return t->get_free_buffer();
}
return 0;
}
SendStatus SendStatus
TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, TransporterRegistry::prepareSend(const SignalHeader * const signalHeader,
Uint8 prio, Uint8 prio,
......
...@@ -1656,7 +1656,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1656,7 +1656,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false, false,
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
"256K", "256K",
"16K", "64K",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{ {
...@@ -1844,7 +1844,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1844,7 +1844,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false, false,
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
"1M", "1M",
"4K", "64K",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment