Commit 1ab953e3 authored by unknown's avatar unknown

ndb - add OM_AUTO_SYNC ta make sure os-kernel does not buffer too much

      add sync-flag to FsAppendReq


storage/ndb/include/kernel/signaldata/FsAppendReq.hpp:
  Add sync flag to FsAppend
storage/ndb/include/kernel/signaldata/FsOpenReq.hpp:
  Add auto sync flag to FSOPEN
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp:
  Add append_synch and auto sync
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp:
  Add variables for auto sync
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp:
  Add append_sync and auto sync
parent e9274862
......@@ -39,7 +39,7 @@ class FsAppendReq {
friend bool printFSAPPENDREQ(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo);
public:
STATIC_CONST( SignalLength = 6 );
STATIC_CONST( SignalLength = 7 );
private:
......@@ -52,6 +52,7 @@ private:
UintR varIndex; // DATA 3
UintR offset; // DATA 4
UintR size; // DATA 5
UintR synch_flag; // DATA 6
};
#endif
......@@ -53,7 +53,7 @@ public:
/**
* Length of signal
*/
STATIC_CONST( SignalLength = 10 );
STATIC_CONST( SignalLength = 11 );
SECTION( FILENAME = 0 );
private:
......@@ -69,6 +69,7 @@ private:
Uint32 page_size;
Uint32 file_size_hi;
Uint32 file_size_lo;
Uint32 auto_sync_size; // In bytes
STATIC_CONST( OM_READONLY = 0 );
STATIC_CONST( OM_WRITEONLY = 1 );
......@@ -80,10 +81,10 @@ private:
STATIC_CONST( OM_TRUNCATE = 0x200 );
STATIC_CONST( OM_AUTOSYNC = 0x400 );
STATIC_CONST( OM_CREATE_IF_NONE = 0x0400 );
STATIC_CONST( OM_INIT = 0x0800 ); //
STATIC_CONST( OM_CHECK_SIZE = 0x1000 );
STATIC_CONST( OM_DIRECT = 0x2000 );
STATIC_CONST( OM_CREATE_IF_NONE = 0x0800 );
STATIC_CONST( OM_INIT = 0x1000 ); //
STATIC_CONST( OM_CHECK_SIZE = 0x2000 );
STATIC_CONST( OM_DIRECT = 0x4000 );
enum Suffixes {
S_DATA = 0,
......
......@@ -219,6 +219,10 @@ AsyncFile::run()
case Request:: append:
appendReq(request);
break;
case Request:: append_synch:
appendReq(request);
syncReq(request);
break;
case Request::rmrf:
rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
break;
......@@ -246,9 +250,8 @@ extern Uint32 Global_syncFreq;
void AsyncFile::openReq(Request* request)
{
m_openedWithSync = false;
m_syncFrequency = 0;
m_syncCount= 0;
m_auto_sync_freq = 0;
m_write_wo_sync = 0;
// for open.flags, see signal FSOPENREQ
#ifdef NDB_WIN32
......@@ -329,7 +332,7 @@ void AsyncFile::openReq(Request* request)
if (flags & FsOpenReq::OM_AUTOSYNC)
{
m_syncFrequency = 1024*1024; // Hard coded to 1M
m_auto_sync_freq = request->par.open.auto_sync_size;
}
if (flags & FsOpenReq::OM_APPEND){
......@@ -429,7 +432,7 @@ void AsyncFile::openReq(Request* request)
{
request->error = errno;
}
else if(buf.st_size != request->par.open.file_size)
else if((Uint64)buf.st_size != request->par.open.file_size)
{
request->error = FsRef::fsErrInvalidFileSize;
}
......@@ -737,6 +740,10 @@ AsyncFile::writeReq( Request * request)
return;
}
} // while(write_not_complete)
if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
syncReq(request);
}
}
int
......@@ -746,6 +753,8 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
size_t bytes_to_write = chunk_size;
int return_value;
m_write_wo_sync += size;
#ifdef NDB_WIN32
DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
if(dwSFP != offset) {
......@@ -805,7 +814,6 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
}
#endif
m_syncCount+= bytes_written;
buf += bytes_written;
size -= bytes_written;
offset += bytes_written;
......@@ -856,8 +864,7 @@ bool AsyncFile::isOpen(){
void
AsyncFile::syncReq(Request * request)
{
if(m_openedWithSync ||
m_syncCount == 0){
if(m_auto_sync_freq && m_write_wo_sync == 0){
return;
}
#ifdef NDB_WIN32
......@@ -871,7 +878,7 @@ AsyncFile::syncReq(Request * request)
return;
}
#endif
m_syncCount = 0;
m_write_wo_sync = 0;
}
void
......@@ -880,7 +887,7 @@ AsyncFile::appendReq(Request * request){
const char * buf = request->par.append.buf;
Uint32 size = request->par.append.size;
m_syncCount += size;
m_write_wo_sync += size;
#ifdef NDB_WIN32
DWORD dwWritten = 0;
......@@ -912,7 +919,7 @@ AsyncFile::appendReq(Request * request){
}
#endif
if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){
if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
syncReq(request);
}
}
......
......@@ -123,6 +123,7 @@ public:
sync,
end,
append,
append_synch,
rmrf,
readPartial
};
......@@ -132,6 +133,7 @@ public:
Uint32 flags;
Uint32 page_size;
Uint64 file_size;
Uint32 auto_sync_size;
} open;
struct {
int numberOfPages;
......@@ -232,9 +234,8 @@ private:
int theWriteBufferSize;
char* theWriteBuffer;
bool m_openedWithSync;
Uint32 m_syncCount;
Uint32 m_syncFrequency;
size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes
public:
SimulatedBlock& m_fs;
Ptr<GlobalPage> m_page_ptr;
......
......@@ -230,6 +230,7 @@ Ndbfs::execFSOPENREQ(Signal* signal)
request->par.open.file_size = fsOpenReq->file_size_hi;
request->par.open.file_size <<= 32;
request->par.open.file_size |= fsOpenReq->file_size_lo;
request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
ndbrequire(forward(file, request));
}
......@@ -567,6 +568,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
const Uint32 tSz = myBaseAddrRef->nrr;
const Uint32 offset = fsReq->offset;
const Uint32 size = fsReq->size;
const Uint32 synch_flag = fsReq->synch_flag;
Request *request = theRequestPool->get();
if (openFile == NULL) {
......@@ -596,12 +598,15 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
request->error = 0;
request->set(userRef, userPointer, filePointer);
request->file = openFile;
request->action = Request::append;
request->theTrace = signal->getTrace();
request->par.append.buf = (const char *)(tWA + offset);
request->par.append.size = size << 2;
if (!synch_flag)
request->action = Request::append;
else
request->action = Request::append_synch;
ndbrequire(forward(openFile, request));
return;
......@@ -744,7 +749,9 @@ Ndbfs::report(Request * request, Signal* signal)
sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
break;
}
case Request::append: {
case Request::append:
case Request::append_synch:
{
jam();
sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
break;
......@@ -814,7 +821,9 @@ Ndbfs::report(Request * request, Signal* signal)
sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
break;
}//case
case Request::append: {
case Request::append:
case Request::append_synch:
{
jam();
signal->theData[1] = request->par.append.size;
sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment