Commit aa10789f authored by Olivier Bertrand's avatar Olivier Bertrand

BSON development

parent 4eeadedc
......@@ -145,7 +145,7 @@ class BJSON : public BLOCK {
void SetBigint(PBVAL vlp, longlong ll);
void SetFloat(PBVAL vlp, double f);
void SetBool(PBVAL vlp, bool b);
void Clear(PBVAL vlp) { vlp->N = 0; vlp->Nd = 0; vlp->Next = 0; vlp->Type = TYPE_NULL; }
void Clear(PBVAL vlp) { vlp->N = 0; vlp->Nd = 0; vlp->Next = 0; }
bool IsValueNull(PBVAL vlp);
bool IsJson(PBVAL vlp) {return (vlp->Type == TYPE_JAR || vlp->Type == TYPE_JOB);}
......
......@@ -1737,7 +1737,6 @@ char *bfile_bjson(UDF_INIT *initid, UDF_ARGS *args, char *result,
ssize_t len, newloc;
size_t lrecl, binszp;
PBVAL jsp;
PBJNX bnxp;
PGLOBAL g = (PGLOBAL)initid->ptr;
BDOC doc(g);
......
......@@ -1668,13 +1668,14 @@ void BLKFAM::Rewind(void)
/***********************************************************************/
/* BIN GetFileLength: returns file size in number of bytes. */
/***********************************************************************/
int BINFAM::GetFileLength(PGLOBAL g) {
int BINFAM::GetFileLength(PGLOBAL g)
{
int len;
if (!BStream)
if (!Stream)
len = TXTFAM::GetFileLength(g);
else
if ((len = _filelength(_fileno(BStream))) < 0)
if ((len = _filelength(_fileno(Stream))) < 0)
sprintf(g->Message, MSG(FILELEN_ERROR), "_filelength", To_File);
xtrc(1, "File length=%d\n", len);
......@@ -1686,10 +1687,12 @@ int BINFAM::GetFileLength(PGLOBAL g) {
/* This function can be called with a null argument to test the */
/* availability of Cardinality implementation (1 yes, 0 no). */
/***********************************************************************/
int BINFAM::Cardinality(PGLOBAL g) {
int BINFAM::Cardinality(PGLOBAL g)
{
return (g) ? -1 : 0;
} // end of Cardinality
#if 0
/***********************************************************************/
/* OpenTableFile: Open a DOS/UNIX table file using C standard I/Os. */
/***********************************************************************/
......@@ -1713,16 +1716,16 @@ bool BINFAM::OpenTableFile(PGLOBAL g) {
// Now open the file stream
PlugSetPath(filename, To_File, Tdbp->GetPath());
if (!(BStream = PlugOpenFile(g, filename, opmode))) {
if (!(Stream = PlugOpenFile(g, filename, opmode))) {
if (trace(1))
htrc("%s\n", g->Message);
return (mode == MODE_READ && errno == ENOENT)
? PushWarning(g, Tdbp) : true;
} // endif BStream
} // endif Stream
if (trace(1))
htrc("File %s open BStream=%p mode=%s\n", filename, BStream, opmode);
htrc("File %s open Stream=%p mode=%s\n", filename, Stream, opmode);
To_Fb = dbuserp->Openlist; // Keep track of File block
......@@ -1731,12 +1734,14 @@ bool BINFAM::OpenTableFile(PGLOBAL g) {
/*********************************************************************/
return AllocateBuffer(g);
} // end of OpenTableFile
#endif 0
/***********************************************************************/
/* Allocate the line buffer. For mode Delete a bigger buffer has to */
/* be allocated because is it also used to move lines into the file. */
/***********************************************************************/
bool BINFAM::AllocateBuffer(PGLOBAL g) {
bool BINFAM::AllocateBuffer(PGLOBAL g)
{
MODE mode = Tdbp->GetMode();
// Lrecl is Ok
......@@ -1749,6 +1754,7 @@ bool BINFAM::AllocateBuffer(PGLOBAL g) {
return false;
} // end of AllocateBuffer
#if 0
/***********************************************************************/
/* GetRowID: return the RowID of last read record. */
/***********************************************************************/
......@@ -1767,7 +1773,7 @@ int BINFAM::GetPos(void) {
/* GetNextPos: return the position of next record. */
/***********************************************************************/
int BINFAM::GetNextPos(void) {
return ftell(BStream);
return ftell(Stream);
} // end of GetNextPos
/***********************************************************************/
......@@ -1776,7 +1782,7 @@ int BINFAM::GetNextPos(void) {
bool BINFAM::SetPos(PGLOBAL g, int pos) {
Fpos = pos;
if (fseek(BStream, Fpos, SEEK_SET)) {
if (fseek(Stream, Fpos, SEEK_SET)) {
sprintf(g->Message, MSG(FSETPOS_ERROR), Fpos);
return true;
} // endif
......@@ -1789,7 +1795,7 @@ bool BINFAM::SetPos(PGLOBAL g, int pos) {
/* Record file position in case of UPDATE or DELETE. */
/***********************************************************************/
bool BINFAM::RecordPos(PGLOBAL g) {
if ((Fpos = ftell(BStream)) < 0) {
if ((Fpos = ftell(Stream)) < 0) {
sprintf(g->Message, MSG(FTELL_ERROR), 0, strerror(errno));
// strcat(g->Message, " (possible wrong ENDING option value)");
return true;
......@@ -1797,14 +1803,16 @@ bool BINFAM::RecordPos(PGLOBAL g) {
return false;
} // end of RecordPos
#endif // 0
/***********************************************************************/
/* ReadBuffer: Read one line for a text file. */
/***********************************************************************/
int BINFAM::ReadBuffer(PGLOBAL g) {
int BINFAM::ReadBuffer(PGLOBAL g)
{
int rc;
if (!BStream)
if (!Stream)
return RC_EF;
xtrc(2, "ReadBuffer: Tdbp=%p To_Line=%p Placed=%d\n",
......@@ -1823,11 +1831,11 @@ int BINFAM::ReadBuffer(PGLOBAL g) {
Placed = false;
xtrc(2, " About to read: bstream=%p To_Buf=%p Buflen=%d\n",
BStream, To_Buf, Buflen);
Stream, To_Buf, Buflen);
// Read the prefix giving the row length
if (!fread(&Recsize, sizeof(size_t), 1, BStream)) {
if (!feof(BStream)) {
if (!fread(&Recsize, sizeof(size_t), 1, Stream)) {
if (!feof(Stream)) {
strcpy(g->Message, "Error reading line prefix\n");
return RC_FX;
} else
......@@ -1838,12 +1846,12 @@ int BINFAM::ReadBuffer(PGLOBAL g) {
return RC_FX;
} // endif Recsize
if (fread(To_Buf, Recsize, 1, BStream)) {
if (fread(To_Buf, Recsize, 1, Stream)) {
xtrc(2, " Read: To_Buf=%p Recsize=%zd\n", To_Buf, Recsize);
// memcpy(Tdbp->GetLine(), To_Buf, Recsize);
num_read++;
rc = RC_OK;
} else if (feof(BStream)) {
} else if (feof(Stream)) {
rc = RC_EF;
} else {
#if defined(__WIN__)
......@@ -1863,23 +1871,24 @@ int BINFAM::ReadBuffer(PGLOBAL g) {
/***********************************************************************/
/* WriteBuffer: File write routine for BIN access method. */
/***********************************************************************/
int BINFAM::WriteBuffer(PGLOBAL g) {
int BINFAM::WriteBuffer(PGLOBAL g)
{
int curpos = 0;
bool moved = true;
/*********************************************************************/
/* Prepare writing the line. */
/*********************************************************************/
memcpy(To_Buf, Tdbp->GetLine(), Recsize);
//memcpy(To_Buf, Tdbp->GetLine(), Recsize);
/*********************************************************************/
/* Now start the writing process. */
/*********************************************************************/
if (fwrite(&Recsize, sizeof(size_t), 1, BStream) != 1) {
if (fwrite(&Recsize, sizeof(size_t), 1, Stream) != 1) {
sprintf(g->Message, "Error %d writing prefix to %s",
errno, To_File);
return RC_FX;
} else if (fwrite(To_Buf, Recsize, 1, BStream) != 1) {
} else if (fwrite(To_Buf, Recsize, 1, Stream) != 1) {
sprintf(g->Message, "Error %d writing %zd bytes to %s",
errno, Recsize, To_File);
return RC_FX;
......@@ -1889,24 +1898,153 @@ int BINFAM::WriteBuffer(PGLOBAL g) {
return RC_OK;
} // end of WriteBuffer
#if 0
/***********************************************************************/
/* Data Base delete line routine for DOS and BLK access methods. */
/***********************************************************************/
int DOSFAM::DeleteRecords(PGLOBAL g, int irc)
{
bool moved;
int curpos = ftell(Stream);
/*********************************************************************/
/* There is an alternative here: */
/* 1 - use a temporary file in which are copied all not deleted */
/* lines, at the end the original file will be deleted and */
/* the temporary file renamed to the original file name. */
/* 2 - directly move the not deleted lines inside the original */
/* file, and at the end erase all trailing records. */
/* This will be experimented. */
/*********************************************************************/
if (trace(1))
htrc(
"DOS DeleteDB: rc=%d UseTemp=%d curpos=%d Fpos=%d Tpos=%d Spos=%d\n",
irc, UseTemp, curpos, Fpos, Tpos, Spos);
if (irc != RC_OK) {
/*******************************************************************/
/* EOF: position Fpos at the end-of-file position. */
/*******************************************************************/
fseek(Stream, 0, SEEK_END);
Fpos = ftell(Stream);
if (trace(1))
htrc("Fpos placed at file end=%d\n", Fpos);
} // endif irc
if (Tpos == Spos) {
/*******************************************************************/
/* First line to delete, Open temporary file. */
/*******************************************************************/
if (UseTemp) {
if (OpenTempFile(g))
return RC_FX;
} else {
/*****************************************************************/
/* Move of eventual preceding lines is not required here. */
/* Set the target file as being the source file itself. */
/* Set the future Tpos, and give Spos a value to block copying. */
/*****************************************************************/
T_Stream = Stream;
Spos = Tpos = Fpos;
} // endif UseTemp
} // endif Tpos == Spos
/*********************************************************************/
/* Move any intermediate lines. */
/*********************************************************************/
if (MoveIntermediateLines(g, &moved))
return RC_FX;
if (irc == RC_OK) {
/*******************************************************************/
/* Reposition the file pointer and set Spos. */
/*******************************************************************/
if (!UseTemp || moved)
if (fseek(Stream, curpos, SEEK_SET)) {
sprintf(g->Message, MSG(FSETPOS_ERROR), 0);
return RC_FX;
} // endif
Spos = GetNextPos(); // New start position
if (trace(1))
htrc("after: Tpos=%d Spos=%d\n", Tpos, Spos);
} else {
/*******************************************************************/
/* Last call after EOF has been reached. */
/* The UseTemp case is treated in CloseTableFile. */
/*******************************************************************/
if (!UseTemp & !Abort) {
/*****************************************************************/
/* Because the chsize functionality is only accessible with a */
/* system call we must close the file and reopen it with the */
/* open function (_fopen for MS ??) this is still to be checked */
/* for compatibility with Text files and other OS's. */
/*****************************************************************/
char filename[_MAX_PATH];
int h; // File handle, return code
PlugSetPath(filename, To_File, Tdbp->GetPath());
/*rc=*/ PlugCloseFile(g, To_Fb);
if ((h= global_open(g, MSGID_OPEN_STRERROR, filename, O_WRONLY)) <= 0)
return RC_FX;
/*****************************************************************/
/* Remove extra records. */
/*****************************************************************/
#if defined(__WIN__)
if (chsize(h, Tpos)) {
sprintf(g->Message, MSG(CHSIZE_ERROR), strerror(errno));
close(h);
return RC_FX;
} // endif
#else
if (ftruncate(h, (off_t)Tpos)) {
sprintf(g->Message, MSG(TRUNCATE_ERROR), strerror(errno));
close(h);
return RC_FX;
} // endif
#endif
close(h);
if (trace(1))
htrc("done, h=%d irc=%d\n", h, irc);
} // endif !UseTemp
} // endif irc
return RC_OK; // All is correct
} // end of DeleteRecords
#endif // 0
/***********************************************************************/
/* Table file close routine for DOS access method. */
/***********************************************************************/
void BINFAM::CloseTableFile(PGLOBAL g, bool abort) {
int rc;
void BINFAM::CloseTableFile(PGLOBAL g, bool abort)
{
int rc;
Abort = abort;
rc = PlugCloseFile(g, To_Fb);
xtrc(1, "BIN Close: closing %s rc=%d\n", To_File, rc);
BStream = NULL; // So we can know whether table is open
Abort = abort;
rc = PlugCloseFile(g, To_Fb);
xtrc(1, "BIN Close: closing %s rc=%d\n", To_File, rc);
Stream = NULL; // So we can know whether table is open
} // end of CloseTableFile
/***********************************************************************/
/* Rewind routine for BIN access method. */
/***********************************************************************/
void BINFAM::Rewind(void) {
if (BStream) // Can be NULL when making index on void table
rewind(BStream);
void BINFAM::Rewind(void)
{
if (Stream) // Can be NULL when making index on void table
rewind(Stream);
Rows = 0;
OldBlk = CurBlk = -1;
......
......@@ -215,16 +215,16 @@ class DllExport BLKFAM : public DOSFAM {
/* This is the DOS/UNIX Access Method class declaration for binary */
/* files with variable record format (BJSON) */
/***********************************************************************/
class DllExport BINFAM : public TXTFAM {
class DllExport BINFAM : public DOSFAM {
public:
// Constructor
BINFAM(PDOSDEF tdp) : TXTFAM(tdp) {BStream = NULL; Recsize = 0;}
BINFAM(PBINFAM txfp) : TXTFAM(txfp) {BStream = txfp->BStream;}
BINFAM(PDOSDEF tdp) : DOSFAM(tdp) {Recsize = 0;}
BINFAM(PBINFAM txfp) : DOSFAM(txfp) {Recsize = txfp->Recsize;}
// Implementation
virtual AMT GetAmType(void) {return TYPE_AM_BIN;}
virtual int GetPos(void);
virtual int GetNextPos(void);
//virtual int GetPos(void);
//virtual int GetNextPos(void);
virtual PTXF Duplicate(PGLOBAL g) { return (PTXF)new(g) BINFAM(this); }
// Methods
......@@ -233,23 +233,22 @@ class DllExport BINFAM : public TXTFAM {
virtual int Cardinality(PGLOBAL g);
virtual int MaxBlkSize(PGLOBAL g, int s) {return s;}
virtual bool AllocateBuffer(PGLOBAL g);
virtual int GetRowID(void);
virtual bool RecordPos(PGLOBAL g);
virtual bool SetPos(PGLOBAL g, int recpos);
//virtual int GetRowID(void);
//virtual bool RecordPos(PGLOBAL g);
//virtual bool SetPos(PGLOBAL g, int recpos);
virtual int SkipRecord(PGLOBAL g, bool header) {return 0;}
virtual bool OpenTableFile(PGLOBAL g);
//virtual bool OpenTableFile(PGLOBAL g);
virtual int ReadBuffer(PGLOBAL g);
virtual int WriteBuffer(PGLOBAL g);
virtual int DeleteRecords(PGLOBAL g, int irc) {return RC_FX;}
//virtual int DeleteRecords(PGLOBAL g, int irc);
virtual void CloseTableFile(PGLOBAL g, bool abort);
virtual void Rewind(void);
protected:
//protected:
//virtual int InitDelete(PGLOBAL g, int fpos, int spos);
// Members
FILE *BStream; // Points to Bin file structure
size_t Recsize; // Length of last read record
size_t Recsize; // Length of last read or next written record
}; // end of class BINFAM
#endif // __FILAMTXT_H
......@@ -1012,7 +1012,7 @@ PBVAL BCUTIL::GetRow(PGLOBAL g)
PBVAL nwr, row = Tp->Row;
for (int i = 0; i < nod && row; i++) {
if (nodes[i + 1].Op == OP_XX)
if (i < nod-1 && nodes[i+1].Op == OP_XX)
break;
else switch (row->Type) {
case TYPE_JOB:
......@@ -1411,29 +1411,31 @@ int TDBBSN::EstimatedLength(void)
/***********************************************************************/
bool TDBBSN::OpenDB(PGLOBAL g)
{
TUSE use = Use;
if (Pretty < 0 && Mode == MODE_UPDATE) {
sprintf(g->Message, "Mode %d NIY for Bjson", Mode);
return true;
} // endif Mode
if (Use == USE_OPEN) {
/*******************************************************************/
/* Table already open replace it at its beginning. */
/* Table already open replace it at its beginning. ??? */
/*******************************************************************/
Fpos = -1;
NextSame = 0;
SameRow = 0;
} else {
/*******************************************************************/
/* First opening. */
/*******************************************************************/
if (Mode == MODE_INSERT)
switch (Jmode) {
case MODE_OBJECT: Row = Bp->NewVal(TYPE_JOB); break;
case MODE_ARRAY: Row = Bp->NewVal(TYPE_JAR); break;
case MODE_VALUE: Row = Bp->NewVal(TYPE_JVAL); break;
default:
sprintf(g->Message, "Invalid Jmode %d", Jmode);
return true;
} // endswitch Jmode
} // endif Use
/*********************************************************************/
/* Open according to logical input/output mode required. */
/*********************************************************************/
if (TDBDOS::OpenDB(g))
return true;
if (use == USE_OPEN)
return false;
if (Pretty < 0) {
/*******************************************************************/
/* Binary BJSON table. */
......@@ -1441,45 +1443,45 @@ bool TDBBSN::OpenDB(PGLOBAL g)
xtrc(1, "JSN OpenDB: tdbp=%p tdb=R%d use=%d mode=%d\n",
this, Tdb_No, Use, Mode);
if (Use == USE_OPEN) {
/*******************************************************************/
/* Table already open, just replace it at its beginning. */
/*******************************************************************/
if (!To_Kindex) {
Txfp->Rewind(); // see comment in Work.log
} else // Table is to be accessed through a sorted index table
To_Kindex->Reset(); // TODO: NIY
return false;
} // endif use
/*********************************************************************/
/* Open according to logical input/output mode required. */
/* Use conventionnal input/output functions. */
/*********************************************************************/
if (Txfp->OpenTableFile(g))
return true;
Use = USE_OPEN; // Do it now in case we are recursively called
/*********************************************************************/
/* Lrecl is Ok. */
/* Lrecl is Ok. */
/*********************************************************************/
size_t linelen = Lrecl;
// Buffer should be the first allocated thing in G->Sarea
// Buffer must be set to G->Sarea
Txfp->AllocateBuffer(Bp->G);
if (Mode == MODE_INSERT)
Bp->SubSet(true);
else
Bp->MemSave();
To_Line = Txfp->GetBuf();
memset(To_Line, 0, linelen);
Bp->MemSave();
xtrc(1, "OpenJSN: R%hd mode=%d To_Line=%p\n", Tdb_No, Mode, To_Line);
} else if (TDBDOS::OpenDB(g))
return true;
} // endif Pretty
/***********************************************************************/
/* First opening. */
/***********************************************************************/
if (Mode == MODE_INSERT) {
switch (Jmode) {
case MODE_OBJECT: Row = Bp->NewVal(TYPE_JOB); break;
case MODE_ARRAY: Row = Bp->NewVal(TYPE_JAR); break;
case MODE_VALUE: Row = Bp->NewVal(TYPE_JVAL); break;
default:
sprintf(g->Message, "Invalid Jmode %d", Jmode);
return true;
} // endswitch Jmode
Bp->MemSave();
} // endif Mode
if (Xcol)
To_Filter = NULL; // Imcompatible
return false;
} // end of OpenDB
/***********************************************************************/
......@@ -1564,26 +1566,30 @@ int TDBBSN::ReadDB(PGLOBAL g)
/***********************************************************************/
bool TDBBSN::PrepareWriting(PGLOBAL g)
{
PSZ s;
if (Pretty >= 0) {
PSZ s;
if (!(Top = Bp->MakeTopTree(g, Row)))
return true;
if (!(Top = Bp->MakeTopTree(g, Row)))
return true;
if ((s = Bp->SerialVal(g, Top, Pretty))) {
if (Comma)
strcat(s, ",");
if ((s = Bp->SerialVal(g, Top, Pretty))) {
if (Comma)
strcat(s, ",");
if ((signed)strlen(s) > Lrecl) {
strncpy(To_Line, s, Lrecl);
sprintf(g->Message, "Line truncated (lrecl=%d)", Lrecl);
return PushWarning(g, this);
} else
strcpy(To_Line, s);
if ((signed)strlen(s) > Lrecl) {
strncpy(To_Line, s, Lrecl);
sprintf(g->Message, "Line truncated (lrecl=%d)", Lrecl);
return PushWarning(g, this);
} else
strcpy(To_Line, s);
return false;
return false;
} else
return true;
} else
return true;
((BINFAM*)Txfp)->Recsize = ((size_t)PlugSubAlloc(Bp->G, NULL, 0)
- (size_t)To_Line);
return false;
} // end of PrepareWriting
/***********************************************************************/
......@@ -2034,6 +2040,7 @@ void BSONCOL::WriteColumn(PGLOBAL g)
else
Cp->AddArrayValue(row, jsp);
break;
case TYPE_JOB:
if (Nodes[Nod - 1].Key)
Cp->SetKeyValue(row, jsp, Nodes[Nod - 1].Key);
......
......@@ -2148,6 +2148,9 @@ bool TDBDOS::OpenDB(PGLOBAL g)
} // endif use
if (Mode == MODE_DELETE && !Next && Txfp->GetAmType() != TYPE_AM_DOS
#if defined(BSON_SUPPORT)
&& Txfp->GetAmType() != TYPE_AM_BIN
#endif // BSON_SUPPORT
&& Txfp->GetAmType() != TYPE_AM_MGO) {
// Delete all lines. Not handled in MAP or block mode
Txfp = new(g) DOSFAM((PDOSDEF)To_Def);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment