Commit 2fd4f480 authored by Paul McCullagh's avatar Paul McCullagh

Merged changes for bug fix update 1.0.08c RC2

parent 10cedc20
PBXT Release Notes
==================
------- 1.0.08c RC2 - 2009-08-18
RN266: Updated BLOB streaming glue, used with the PBMS engine. The glue code is now identical to the version of "1.0.08-rc-pbms" version of PBXT available from http://blobstreaming.org/download.
RN265: Changes the sequential reading of data log files to skip gaps, instead of returning EOF. This ensures that extended data records are preserved even when something goes wrong with the way the file is written.
RN264: Fixed a bug that cased an "Data log not found" error after an out of disk space error on a log file. This bug is similar to RN262 in that it allows "gaps" to appear in the data logs.
RN263: Updated xtstat to compile on Windows/MS Visual C++.
RN262: Merged changes for PBMS version 0.5.09.
RN261: Concerning bug #377788: Cannot find index for FK. Fixed buffer overflow which occurred when the error was reported.
RN260: Fixed bug #377788: Cannot find index for FK. PBXT now correctly uses prefix of an index to support FK references (e.g. if key = (c1, c2) then an index on (c1, c2, c3) will work). Also fixed buffer overflow, which occurred when reporting the error.
RN259: Fixed bug #309424: xtstat doesn't use my.cnf. You can now add an [xtstat] section to my.cnf, for use with xtstat.
RN258: updated xt_p_join implementation for Windows to check if a thread has already exited or has not yet started
RN257: Removed false assertion that could fail during restore if a transaction log page was zero-filled
RN256: Update datalog eof pointer only if write opearions were sucessful
RN255: Added re-allocation of of filemap if allocating the of the new map failed. This often happens if there's not enough space on disk.
RN254: When a table with a corrupted index is detected, PBXT creates a file called 'repair-pending' in the pbxt directory, with the name of the table in it. Each table in the file is listed on a line by itself (the last line has no trailing \n). When the table is repaired (using the REPAIR TABLE command), this entry is removed from the file.
RN253: Use fcntl(F_FULLFSYNC) instead of fsync on platforms that support it. Improper fsync operation was presumably the reason of index corruption on Mac OS X.
RN252: Fixed bug #368692: PBXT not reporting data size correctly in information_schema.
------- 1.0.08 RC2 - 2009-06-30
RN251: A Windows-specific test update, also removed false assertion that failed on Windows.
RN250: Fixed a bug that caused recovery to fail when the transaction log ID exceeded 255. The problem was a checksum failed in the log record.
RN249: Fixed bug #313176: Test case timeout. This happened because record cache pages where not propoerly freed and as soon as cache filled up the performacne degraded.
RN249: Fixed bug #313176: Test case timeout. This happened because record cache pages where not properly freed and as soon as cache filled up the performacne degraded.
RN248: PBXT now compiles and runs with MySQL 5.1.35. All tests pass.
......
......@@ -1735,8 +1735,8 @@ void XTDDConstraint::alterColumnName(XTThreadPtr self, char *from_name, char *to
void XTDDConstraint::getColumnList(char *buffer, size_t size)
{
if (co_table->dt_table) {
xt_strcat(size, buffer, "`");
xt_strcpy(size, buffer, co_table->dt_table->tab_name->ps_path);
xt_strcpy(size, buffer, "`");
xt_strcat(size, buffer, co_table->dt_table->tab_name->ps_path);
xt_strcat(size, buffer, "` (`");
}
else
......@@ -1763,6 +1763,20 @@ bool XTDDConstraint::sameColumns(XTDDConstraint *co)
return OK;
}
bool XTDDConstraint::samePrefixColumns(XTDDConstraint *co)
{
u_int i = 0;
if (co_cols.size() > co->co_cols.size())
return false;
while (i<co_cols.size()) {
if (myxt_strcasecmp(co_cols.itemAt(i)->cr_col_name, co->co_cols.itemAt(i)->cr_col_name) != 0)
return false;
i++;
}
return OK;
}
bool XTDDConstraint::attachColumns()
{
XTDDColumn *col;
......@@ -2176,6 +2190,20 @@ bool XTDDForeignKey::sameReferenceColumns(XTDDConstraint *co)
return OK;
}
bool XTDDForeignKey::samePrefixReferenceColumns(XTDDConstraint *co)
{
u_int i = 0;
if (fk_ref_cols.size() > co->co_cols.size())
return false;
while (i<fk_ref_cols.size()) {
if (myxt_strcasecmp(fk_ref_cols.itemAt(i)->cr_col_name, co->co_cols.itemAt(i)->cr_col_name) != 0)
return false;
i++;
}
return OK;
}
bool XTDDForeignKey::checkReferencedTypes(XTDDTable *dt)
{
XTDDColumn *col, *ref_col;
......@@ -2720,16 +2748,24 @@ void XTDDTable::checkForeignKeys(XTThreadPtr self, bool temp_table)
XTDDIndex *XTDDTable::findIndex(XTDDConstraint *co)
{
XTDDIndex *ind;
XTDDIndex *ind = NULL;
XTDDIndex *cur_ind;
u_int index_size = UINT_MAX;
for (u_int i=0; i<dt_indexes.size(); i++) {
ind = dt_indexes.itemAt(i);
if (co->sameColumns(ind))
return ind;
cur_ind = dt_indexes.itemAt(i);
u_int sz = cur_ind->getIndexPtr()->mi_key_size;
if (sz < index_size && co->samePrefixColumns(cur_ind)) {
ind = cur_ind;
index_size = sz;
}
}
if (ind)
return ind;
{
char buffer[XT_ERR_MSG_SIZE - 200];
co->getColumnList(buffer, XT_ERR_MSG_SIZE - 200);
xt_register_ixterr(XT_REG_CONTEXT, XT_ERR_NO_MATCHING_INDEX, buffer);
}
......@@ -2738,16 +2774,24 @@ XTDDIndex *XTDDTable::findIndex(XTDDConstraint *co)
XTDDIndex *XTDDTable::findReferenceIndex(XTDDForeignKey *fk)
{
XTDDIndex *ind;
XTDDIndex *ind = NULL;
XTDDIndex *cur_ind;
XTDDColumnRef *cr;
u_int i;
u_int index_size = UINT_MAX;
for (i=0; i<dt_indexes.size(); i++) {
ind = dt_indexes.itemAt(i);
if (fk->sameReferenceColumns(ind))
return ind;
cur_ind = dt_indexes.itemAt(i);
u_int sz = cur_ind->getIndexPtr()->mi_key_size;
if (sz < index_size && fk->samePrefixReferenceColumns(cur_ind)) {
ind = cur_ind;
index_size = sz;
}
}
if (ind)
return ind;
/* If the index does not exist, maybe the columns do not exist?! */
for (i=0; i<fk->fk_ref_cols.size(); i++) {
cr = fk->fk_ref_cols.itemAt(i);
......
......@@ -171,6 +171,7 @@ class XTDDConstraint : public XTObject {
virtual void alterColumnName(XTThreadPtr self, char *from_name, char *to_name);
void getColumnList(char *buffer, size_t size);
bool sameColumns(XTDDConstraint *co);
bool samePrefixColumns(XTDDConstraint *co);
bool attachColumns();
};
......@@ -240,6 +241,7 @@ class XTDDForeignKey : public XTDDIndex {
void getReferenceList(char *buffer, size_t size);
struct XTIndex *getReferenceIndexPtr();
bool sameReferenceColumns(XTDDConstraint *co);
bool samePrefixReferenceColumns(XTDDConstraint *co);
bool checkReferencedTypes(XTDDTable *dt);
void removeReference(XTThreadPtr self);
bool insertRow(xtWord1 *before, xtWord1 *after, XTThreadPtr thread);
......
......@@ -69,6 +69,7 @@ xtBool XTDataSeqRead::sl_seq_init(struct XTDatabase *db, size_t buffer_size)
sl_rec_log_id = 0;
sl_rec_log_offset = 0;
sl_record_len = 0;
sl_extra_garbage = 0;
return sl_buffer != NULL;
}
......@@ -130,8 +131,25 @@ xtBool XTDataSeqRead::sl_rnd_read(xtLogOffset log_offset, size_t size, xtWord1 *
/*
* Unlike the transaction log sequential reader, this function only returns
* the header of a record.
*
* {SKIP-GAPS}
* This function now skips gaps. This should not be required, because in normal
* operation, no gaps should be created.
*
* However, if his happens there is a danger that a valid record after the
* gap will be lost.
*
* So, if we find an invalid record, we scan through the log to find the next
* valid record. Note, that there is still a danger that will will find
* data that looks like a valid record, but is not.
*
* In this case, this "pseudo record" may cause the function to actually skip
* valid records.
*
* Note, any such malfunction will eventually cause the record to be lost forever
* after the garbage collector has run.
*/
xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, xtBool verify, struct XTThread *thread)
xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, struct XTThread *thread)
{
XTXactLogBufferDPtr record;
size_t tfer;
......@@ -140,10 +158,12 @@ xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, xtBool verify,
size_t max_rec_len;
xtBool reread_from_buffer;
xtWord4 size;
xtLogOffset gap_start = 0;
/* Go to the next record (xseq_record_len must be initialized
* to 0 for this to work.
*/
retry:
sl_rec_log_offset += sl_record_len;
sl_record_len = 0;
......@@ -174,6 +194,8 @@ xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, xtBool verify,
record = (XTXactLogBufferDPtr) (sl_buffer + rec_offset);
switch (record->xl.xl_status_1) {
case XT_LOG_ENT_HEADER:
if (sl_rec_log_offset != 0)
goto scan_to_next_record;
if (offsetof(XTXactLogHeaderDRec, xh_size_4) + 4 > max_rec_len) {
reread_from_buffer = TRUE;
goto read_more;
......@@ -183,33 +205,42 @@ xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, xtBool verify,
reread_from_buffer = TRUE;
goto read_more;
}
if (verify) {
if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(sl_rec_log_id))
goto return_empty;
if (XT_LOG_HEAD_MAGIC(record, len) != XT_LOG_FILE_MAGIC)
if (record->xh.xh_checksum_1 != XT_CHECKSUM_1(sl_rec_log_id))
goto return_empty;
if (XT_LOG_HEAD_MAGIC(record, len) != XT_LOG_FILE_MAGIC)
goto return_empty;
if (len > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
if (XT_GET_DISK_4(record->xh.xh_log_id_4) != sl_rec_log_id)
goto return_empty;
if (len > offsetof(XTXactLogHeaderDRec, xh_log_id_4) + 4) {
if (XT_GET_DISK_4(record->xh.xh_log_id_4) != sl_rec_log_id)
goto return_empty;
}
}
break;
case XT_LOG_ENT_EXT_REC_OK:
case XT_LOG_ENT_EXT_REC_DEL:
if (gap_start) {
xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
gap_start = 0;
}
len = offsetof(XTactExtRecEntryDRec, er_data);
if (len > max_rec_len) {
reread_from_buffer = TRUE;
goto read_more;
}
size = XT_GET_DISK_4(record->er.er_data_size_4);
if (verify) {
if (sl_rec_log_offset + (xtLogOffset) offsetof(XTactExtRecEntryDRec, er_data) + size > sl_log_eof)
goto return_empty;
}
/* Verify the record as good as we can! */
if (!size)
goto scan_to_next_record;
if (sl_rec_log_offset + (xtLogOffset) offsetof(XTactExtRecEntryDRec, er_data) + size > sl_log_eof)
goto scan_to_next_record;
if (!XT_GET_DISK_4(record->er.er_tab_id_4))
goto scan_to_next_record;
if (!XT_GET_DISK_4(record->er.er_rec_id_4))
goto scan_to_next_record;
break;
default:
ASSERT_NS(FALSE);
goto return_empty;
/* Note, we no longer assume EOF.
* Instead, we skip to the next value record. */
goto scan_to_next_record;
}
if (len <= max_rec_len) {
......@@ -243,7 +274,20 @@ xtBool XTDataSeqRead::sl_seq_next(XTXactLogBufferDPtr *ret_entry, xtBool verify,
*ret_entry = (XTXactLogBufferDPtr) sl_buffer;
return OK;
scan_to_next_record:
if (!gap_start) {
gap_start = sl_rec_log_offset;
xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap found in data log %lu, starting at offset %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start);
}
sl_record_len = 1;
sl_extra_garbage++;
goto retry;
return_empty:
if (gap_start) {
xt_logf(XT_NS_CONTEXT, XT_LOG_WARNING, "Gap in data log %lu, start: %llu, size: %llu\n", (u_long) sl_rec_log_id, (u_llong) gap_start, (u_llong) (sl_rec_log_offset - gap_start));
gap_start = 0;
}
*ret_entry = NULL;
return OK;
}
......@@ -285,23 +329,55 @@ static xtBool dl_create_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of,
return OK;
}
static xtBool dl_write_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtBool flush, XTThreadPtr thread)
static xtBool dl_write_garbage_level(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtBool flush, XTThreadPtr thread)
{
XTXactLogHeaderDRec header;
/* The header was not completely written, so write a new one: */
XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
XT_SET_DISK_8(header.xh_file_len_8, data_log->dlf_log_eof);
XT_SET_DISK_8(header.xh_comp_pos_8, data_log->dlf_start_offset);
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 24, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 8, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
return FAILED;
if (flush && !xt_flush_file(of, &thread->st_statistics.st_data, thread))
return FAILED;
return OK;
}
static void dl_free_seq_read(XTThreadPtr XT_UNUSED(self), XTDataSeqReadPtr seq_read)
/*
* {SKIP-GAPS}
* Extra garbage is the amount of space skipped during recovery of the data
* log file. We assume this space has not be counted as garbage,
* and add it to the garbage count.
*
* This may mean that our estimate of garbaged is higher than it should
* be, but that is better than the other way around.
*
* The fact is, there should not be any gaps in the data log files, so
* this is actually an exeption which should not occur.
*/
static xtBool dl_write_log_header(XTDataLogFilePtr data_log, XTOpenFilePtr of, xtLogOffset extra_garbage, XTThreadPtr thread)
{
XTXactLogHeaderDRec header;
XT_SET_DISK_8(header.xh_file_len_8, data_log->dlf_log_eof);
if (extra_garbage) {
data_log->dlf_garbage_count += extra_garbage;
if (data_log->dlf_garbage_count > data_log->dlf_log_eof)
data_log->dlf_garbage_count = data_log->dlf_log_eof;
XT_SET_DISK_8(header.xh_free_space_8, data_log->dlf_garbage_count);
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_free_space_8), 16, (xtWord1 *) &header.xh_free_space_8, &thread->st_statistics.st_data, thread))
return FAILED;
}
else {
if (!xt_pwrite_file(of, offsetof(XTXactLogHeaderDRec, xh_file_len_8), 8, (xtWord1 *) &header.xh_file_len_8, &thread->st_statistics.st_data, thread))
return FAILED;
}
if (!xt_flush_file(of, &thread->st_statistics.st_data, thread))
return FAILED;
return OK;
}
static void dl_free_seq_read(XTThreadPtr self __attribute__((unused)), XTDataSeqReadPtr seq_read)
{
seq_read->sl_seq_exit();
}
......@@ -318,7 +394,7 @@ static void dl_recover_log(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr
seq_read.sl_seq_start(data_log->dlf_log_id, 0, FALSE);
for (;;) {
if (!seq_read.sl_seq_next(&record, TRUE, self))
if (!seq_read.sl_seq_next(&record, self))
xt_throw(self);
if (!record)
break;
......@@ -331,13 +407,18 @@ static void dl_recover_log(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogFilePtr
}
}
if (!(data_log->dlf_log_eof = seq_read.sl_rec_log_offset)) {
ASSERT_NS(seq_read.sl_log_eof == seq_read.sl_rec_log_offset);
data_log->dlf_log_eof = seq_read.sl_rec_log_offset;
if (data_log->dlf_log_eof < sizeof(XTXactLogHeaderDRec)) {
data_log->dlf_log_eof = sizeof(XTXactLogHeaderDRec);
if (!dl_create_log_header(data_log, seq_read.sl_log_file, self))
xt_throw(self);
}
if (!dl_write_log_header(data_log, seq_read.sl_log_file, TRUE, self))
xt_throw(self);
else {
if (!dl_write_log_header(data_log, seq_read.sl_log_file, seq_read.sl_extra_garbage, self))
xt_throw(self);
}
freer_(); // dl_free_seq_read(&seq_read)
}
......@@ -1110,7 +1191,6 @@ xtBool XTDataLogBuffer::dlb_get_log_offset(xtLogID *log_id, xtLogOffset *out_off
*log_id = dlb_data_log->dlf_log_id;
*out_offset = dlb_data_log->dlf_log_eof;
dlb_data_log->dlf_log_eof += req_size;
return OK;
}
......@@ -1158,6 +1238,11 @@ xtBool XTDataLogBuffer::dlb_write_thru_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtL
if (!xt_pwrite_file(dlb_data_log->dlf_log_file, log_offset, size, data, &thread->st_statistics.st_data, thread))
return FAILED;
/* Increment of dlb_data_log->dlf_log_eof was moved here from dlb_get_log_offset()
* to ensure it is done after a successful update of the log, otherwise otherwise a
* gap occurs in the log which cause eof to be detected in middle of the log
*/
dlb_data_log->dlf_log_eof += size;
#ifdef DEBUG
if (log_offset + size > dlb_max_write_offset)
dlb_max_write_offset = log_offset + size;
......@@ -1179,10 +1264,12 @@ xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOf
if (dlb_buffer_size >= dlb_buffer_len + size) {
memcpy(dlb_log_buffer + dlb_buffer_len, data, size);
dlb_buffer_len += size;
dlb_data_log->dlf_log_eof += size;
return OK;
}
}
dlb_flush_log(FALSE, thread);
if (dlb_flush_log(FALSE, thread) != OK)
return FAILED;
}
ASSERT_NS(dlb_buffer_len == 0);
......@@ -1191,6 +1278,7 @@ xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOf
dlb_buffer_offset = log_offset;
dlb_buffer_len = size;
memcpy(dlb_log_buffer, data, size);
dlb_data_log->dlf_log_eof += size;
return OK;
}
......@@ -1202,6 +1290,7 @@ xtBool XTDataLogBuffer::dlb_append_log(xtLogID XT_NDEBUG_UNUSED(log_id), xtLogOf
dlb_max_write_offset = log_offset + size;
#endif
dlb_flush_required = TRUE;
dlb_data_log->dlf_log_eof += size;
return OK;
}
......@@ -1306,7 +1395,7 @@ xtBool XTDataLogBuffer::dlb_delete_log(xtLogID log_id, xtLogOffset log_offset, s
xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
dlb_data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
ASSERT_NS(dlb_data_log->dlf_garbage_count < dlb_data_log->dlf_log_eof);
if (!dl_write_log_header(dlb_data_log, dlb_data_log->dlf_log_file, FALSE, thread)) {
if (!dl_write_garbage_level(dlb_data_log, dlb_data_log->dlf_log_file, FALSE, thread)) {
xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
return FAILED;
}
......@@ -1329,7 +1418,7 @@ xtBool XTDataLogBuffer::dlb_delete_log(xtLogID log_id, xtLogOffset log_offset, s
xt_lock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
data_log->dlf_garbage_count += offsetof(XTactExtRecEntryDRec, er_data) + size;
ASSERT_NS(data_log->dlf_garbage_count < data_log->dlf_log_eof);
if (!dl_write_log_header(data_log, open_log->odl_log_file, FALSE, thread)) {
if (!dl_write_garbage_level(data_log, open_log->odl_log_file, FALSE, thread)) {
xt_unlock_mutex_ns(&dlb_db->db_datalogs.dlc_head_lock);
goto failed;
}
......@@ -1674,7 +1763,7 @@ static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogF
xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
data_log->dlf_garbage_count += garbage_count;
ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
if (!dl_write_log_header(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
xt_throw(self);
}
......@@ -1683,7 +1772,7 @@ static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogF
freer_(); // dl_free_compactor_state(&cs)
return FAILED;
}
if (!cs.cs_seqread->sl_seq_next(&record, TRUE, self))
if (!cs.cs_seqread->sl_seq_next(&record, self))
xt_throw(self);
cs.cs_seqread->sl_seq_pos(&curr_log_id, &curr_log_offset);
if (!record) {
......@@ -1809,7 +1898,7 @@ static xtBool dl_collect_garbage(XTThreadPtr self, XTDatabaseHPtr db, XTDataLogF
xt_lock_mutex_ns(&db->db_datalogs.dlc_head_lock);
data_log->dlf_garbage_count += garbage_count;
ASSERT(data_log->dlf_garbage_count < data_log->dlf_log_eof);
if (!dl_write_log_header(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
if (!dl_write_garbage_level(data_log, cs.cs_seqread->sl_seq_open_file(), TRUE, self)) {
xt_unlock_mutex_ns(&db->db_datalogs.dlc_head_lock);
xt_throw(self);
}
......
......@@ -183,8 +183,8 @@ typedef struct XTSeqLogRead {
virtual xtBool sl_rnd_read(xtLogOffset log_offset, size_t size, xtWord1 *data, size_t *read, struct XTThread *thread) {
(void) log_offset; (void) size; (void) data; (void) read; (void) thread; return OK;
};
virtual xtBool sl_seq_next(XTXactLogBufferDPtr *entry, xtBool verify, struct XTThread *thread) {
(void) entry; (void) verify; (void) thread; return OK;
virtual xtBool sl_seq_next(XTXactLogBufferDPtr *entry, struct XTThread *thread) {
(void) entry; (void) thread; return OK;
};
virtual void sl_seq_skip(size_t size) { (void) size; }
} XTSeqLogReadRec, *XTSeqLogReadPtr;
......@@ -195,6 +195,7 @@ typedef struct XTDataSeqRead : public XTSeqLogRead {
xtLogOffset sl_rec_log_offset; /* The current log read position. */
size_t sl_record_len; /* The length of the current record. */
xtLogOffset sl_log_eof;
xtLogOffset sl_extra_garbage; /* Garbage found during a scan. */
size_t sl_buffer_size; /* Size of the buffer. */
xtLogOffset sl_buf_log_offset; /* File offset of the buffer. */
......@@ -208,7 +209,7 @@ typedef struct XTDataSeqRead : public XTSeqLogRead {
virtual void sl_seq_pos(xtLogID *log_id, xtLogOffset *log_offset);
virtual xtBool sl_seq_start(xtLogID log_id, xtLogOffset log_offset, xtBool missing_ok);
virtual xtBool sl_rnd_read(xtLogOffset log_offset, size_t size, xtWord1 *data, size_t *read, struct XTThread *thread);
virtual xtBool sl_seq_next(XTXactLogBufferDPtr *entry, xtBool verify, struct XTThread *thread);
virtual xtBool sl_seq_next(XTXactLogBufferDPtr *entry, struct XTThread *thread);
virtual void sl_seq_skip(size_t size);
virtual void sl_seq_skip_to(off_t offset);
} XTDataSeqReadRec, *XTDataSeqReadPtr;
......
......@@ -54,6 +54,7 @@
//#define DEBUG_TRACE_IO
//#define DEBUG_TRACE_MAP_IO
//#define DEBUG_TRACE_FILES
//#define INJECT_WRITE_REMAP_ERROR
#endif
#ifdef DEBUG_TRACE_FILES
......@@ -61,6 +62,11 @@
#define PRINTF xt_trace
#endif
#ifdef INJECT_WRITE_REMAP_ERROR
#define INJECT_REMAP_FILE_SIZE 1000000
#define INJECT_REMAP_FILE_TYPE "xtd"
#endif
/* ----------------------------------------------------------------------
* Globals
*/
......@@ -867,12 +873,23 @@ xtPublic xtBool xt_flush_file(XTOpenFilePtr of, XTIOStatsPtr stat, XTThreadPtr X
xt_register_ferrno(XT_REG_CONTEXT, fs_get_win_error(), xt_file_path(of));
goto failed;
}
#else
/* Mac OS X has problems with fsync. We had several cases of index corruption presumably because
* fsync didn't really flush index pages to disk. fcntl(F_FULLFSYNC) is considered more effective
* in such case.
*/
#ifdef F_FULLFSYNC
if (fcntl(of->of_filedes, F_FULLFSYNC, 0) == -1) {
xt_register_ferrno(XT_REG_CONTEXT, errno, xt_file_path(of));
goto failed;
}
#else
if (fsync(of->of_filedes) == -1) {
xt_register_ferrno(XT_REG_CONTEXT, errno, xt_file_path(of));
goto failed;
}
#endif
#endif
#ifdef DEBUG_TRACE_IO
xt_trace("/* %s */ pbxt_file_sync(\"%s\");\n", xt_trace_clock_diff(timef, start), of->fr_file->fil_path);
#endif
......@@ -1185,6 +1202,15 @@ off_t xt_dir_file_size(XTThreadPtr self, XTOpenDirPtr od)
static xtBool fs_map_file(XTFileMemMapPtr mm, XTFilePtr file, xtBool grow)
{
#ifdef INJECT_WRITE_REMAP_ERROR
if (xt_is_extension(file->fil_path, INJECT_REMAP_FILE_TYPE)) {
if (mm->mm_length > INJECT_REMAP_FILE_SIZE) {
xt_register_ferrno(XT_REG_CONTEXT, 30, file->fil_path);
return FAILED;
}
}
#endif
ASSERT_NS(!mm->mm_start);
#ifdef XT_WIN
/* This will grow the file to the given size: */
......@@ -1373,14 +1399,23 @@ static xtBool fs_remap_file(XTMapFilePtr map, off_t offset, size_t size, XTIOSta
}
mm->mm_start = NULL;
#ifdef XT_WIN
if (!CloseHandle(mm->mm_mapdes))
/* It is possible that a previous remap attempt has failed: the map was closed
* but the new map was not allocated (e.g. because of insufficient disk space).
* In this case mm->mm_mapdes will be NULL.
*/
if (mm->mm_mapdes && !CloseHandle(mm->mm_mapdes))
return xt_register_ferrno(XT_REG_CONTEXT, fs_get_win_error(), xt_file_path(map));
mm->mm_mapdes = NULL;
#endif
off_t old_size = mm->mm_length;
mm->mm_length = new_size;
if (!fs_map_file(mm, map->fr_file, TRUE))
if (!fs_map_file(mm, map->fr_file, TRUE)) {
/* Try to restore old mapping */
mm->mm_length = old_size;
fs_map_file(mm, map->fr_file, FALSE);
return FAILED;
}
}
return OK;
......
......@@ -65,8 +65,8 @@ extern "C" char **session_query(Session *session);
#include "heap_xt.h"
#include "myxt_xt.h"
#include "datadic_xt.h"
#ifdef XT_STREAMING
#include "streaming_xt.h"
#ifdef PBMS_ENABLED
#include "pbms_enabled.h"
#endif
#include "tabcache_xt.h"
#include "systab_xt.h"
......@@ -968,8 +968,8 @@ static void ha_exit(XTThreadPtr self)
/* This may cause the streaming engine to cleanup connections and
* tables belonging to this engine. This in turn may require some of
* the stuff below (like xt_create_thread() called from pbxt_close_table()! */
#ifdef XT_STREAMING
xt_exit_streaming();
#ifdef PBMS_ENABLED
pbms_finalize();
#endif
pbxt_call_exit(self);
xt_exit_threading(self);
......@@ -1041,7 +1041,7 @@ static int pbxt_init(void *p)
XT_TRACE_CALL();
if (sizeof(xtWordPS) != sizeof(void *)) {
printf("PBXT: This won't work, I require that sizeof(xtWordPS) != sizeof(void *)!\n");
printf("PBXT: This won't work, I require that sizeof(xtWordPS) == sizeof(void *)!\n");
XT_RETURN(1);
}
......@@ -1078,9 +1078,12 @@ static int pbxt_init(void *p)
if (!xt_init_logging()) /* Initialize logging */
goto error_1;
#ifdef XT_STREAMING
if (!xt_init_streaming())
#ifdef PBMS_ENABLED
PBMSResultRec result;
if (!pbms_initialize("PBXT", false, &result)) {
xt_logf(XT_NT_ERROR, "pbms_initialize() Error: %s", result.mr_message);
goto error_2;
}
#endif
if (!xt_init_memory()) /* Initialize memory */
......@@ -1129,7 +1132,7 @@ static int pbxt_init(void *p)
ASSERT(!pbxt_database);
{
THD *curr_thd = current_thd;
THD *thd = curr_thd;
THD *thd = NULL;
#ifndef DRIZZLED
extern myxt_mutex_t LOCK_plugin;
......@@ -1169,16 +1172,20 @@ static int pbxt_init(void *p)
xt_xres_start_database_recovery(self);
}
catch_(b) {
if (!curr_thd && thd)
myxt_destroy_thread(thd, FALSE);
#ifndef DRIZZLED
myxt_mutex_lock(&LOCK_plugin);
#endif
xt_throw(self);
/* It is possible that the error was reset by cleanup code.
* Set a generic error code in that case.
*/
/* PMC - This is not necessary in because exceptions are
* now preserved, in exception handler cleanup.
*/
if (!self->t_exception.e_xt_err)
xt_register_error(XT_REG_CONTEXT, XT_SYSTEM_ERROR, 0, "Initialization failed");
xt_log_exception(self, &self->t_exception, XT_LOG_DEFAULT);
init_err = 1;
}
cont_(b);
if (!curr_thd)
if (thd)
myxt_destroy_thread(thd, FALSE);
#ifndef DRIZZLED
myxt_mutex_lock(&LOCK_plugin);
......@@ -1237,8 +1244,8 @@ static int pbxt_init(void *p)
XT_RETURN(init_err);
error_3:
#ifdef XT_STREAMING
xt_exit_streaming();
#ifdef PBMS_ENABLED
pbms_finalize();
error_2:
#endif
......@@ -1295,9 +1302,6 @@ static int pbxt_close_connection(handlerton *hton, THD* thd)
{
#endif
XTThreadPtr self;
#ifdef XT_STREAMING
XTExceptionRec e;
#endif
XT_TRACE_CALL();
if ((self = (XTThreadPtr) *thd_ha_data(thd, hton))) {
......@@ -1308,10 +1312,6 @@ static int pbxt_close_connection(handlerton *hton, THD* thd)
xt_set_self(self);
xt_free_thread(self);
}
#ifdef XT_STREAMING
if (!xt_pbms_close_connection((void *) thd, &e))
xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
#endif
return 0;
}
......@@ -2301,6 +2301,14 @@ int ha_pbxt::write_row(byte *buf)
XT_PRINT1(pb_open_tab->ot_thread, "ha_pbxt::write_row %s\n", pb_share->sh_table_path->ps_path);
XT_DISABLED_TRACE(("INSERT tx=%d val=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(&buf[1])));
//statistic_increment(ha_write_count,&LOCK_status);
#ifdef PBMS_ENABLED
PBMSResultRec result;
err = pbms_write_row_blobs(table, buf, &result);
if (err) {
xt_logf(XT_NT_ERROR, "pbms_write_row_blobs() Error: %s", result.mr_message);
return err;
}
#endif
/* GOTCHA: I have a huge problem with the transaction statement.
* It is not ALWAYS committed (I mean ha_commit_trans() is
......@@ -2332,7 +2340,8 @@ int ha_pbxt::write_row(byte *buf)
int update_err = update_auto_increment();
if (update_err) {
ha_log_pbxt_thread_error_for_mysql(pb_ignore_dup_key);
return update_err;
err = update_err;
goto done;
}
set_auto_increment(table->next_number_field);
}
......@@ -2350,6 +2359,10 @@ int ha_pbxt::write_row(byte *buf)
pb_open_tab->ot_thread->st_update_id++;
}
done:
#ifdef PBMS_ENABLED
pbms_completed(table, (err == 0));
#endif
return err;
}
......@@ -2423,6 +2436,21 @@ int ha_pbxt::update_row(const byte * old_data, byte * new_data)
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time();
#ifdef PBMS_ENABLED
PBMSResultRec result;
err = pbms_delete_row_blobs(table, old_data, &result);
if (err) {
xt_logf(XT_NT_ERROR, "update_row:pbms_delete_row_blobs() Error: %s", result.mr_message);
return err;
}
err = pbms_write_row_blobs(table, new_data, &result);
if (err) {
xt_logf(XT_NT_ERROR, "update_row:pbms_write_row_blobs() Error: %s", result.mr_message);
goto pbms_done;
}
#endif
/* GOTCHA: We need to check the auto-increment value on update
* because of the following test (which fails for InnoDB) -
* auto_increment.test:
......@@ -2445,6 +2473,11 @@ int ha_pbxt::update_row(const byte * old_data, byte * new_data)
err = ha_log_pbxt_thread_error_for_mysql(pb_ignore_dup_key);
pb_open_tab->ot_table->tab_locks.xt_remove_temp_lock(pb_open_tab, TRUE);
#ifdef PBMS_ENABLED
pbms_done:
pbms_completed(table, (err == 0));
#endif
return err;
}
......@@ -2468,6 +2501,16 @@ int ha_pbxt::delete_row(const byte * buf)
XT_DISABLED_TRACE(("DELETE tx=%d val=%d\n", (int) pb_open_tab->ot_thread->st_xact_data->xd_start_xn_id, (int) XT_GET_DISK_4(&buf[1])));
//statistic_increment(ha_delete_count,&LOCK_status);
#ifdef PBMS_ENABLED
PBMSResultRec result;
err = pbms_delete_row_blobs(table, buf, &result);
if (err) {
xt_logf(XT_NT_ERROR, "pbms_delete_row_blobs() Error: %s", result.mr_message);
return err;
}
#endif
if (!pb_open_tab->ot_thread->st_stat_trans) {
trans_register_ha(pb_mysql_thd, FALSE, pbxt_hton);
XT_PRINT0(pb_open_tab->ot_thread, "ha_pbxt::delete_row trans_register_ha all=FALSE\n");
......@@ -2481,6 +2524,9 @@ int ha_pbxt::delete_row(const byte * buf)
pb_open_tab->ot_table->tab_locks.xt_remove_temp_lock(pb_open_tab, TRUE);
#ifdef PBMS_ENABLED
pbms_completed(table, (err == 0));
#endif
return err;
}
......@@ -3478,7 +3524,7 @@ int ha_pbxt::info(uint flag)
if (flag & HA_STATUS_VARIABLE) {
stats.deleted = ot->ot_table->tab_row_fnum;
stats.records = (ha_rows) (ot->ot_table->tab_row_eof_id - 1 - stats.deleted);
stats.data_file_length = ot->ot_table->tab_rec_eof_id;
stats.data_file_length = xt_rec_id_to_rec_offset(ot->ot_table, ot->ot_table->tab_rec_eof_id);
stats.index_file_length = xt_ind_node_to_offset(ot->ot_table, ot->ot_table->tab_ind_eof);
stats.delete_length = ot->ot_table->tab_rec_fnum * ot->ot_rec_size;
//check_time = info.check_time;
......@@ -4749,6 +4795,19 @@ int ha_pbxt::delete_table(const char *table_path)
#endif
}
cont_(a);
#ifdef PBMS_ENABLED
/* Call pbms_delete_table_with_blobs() last because it cannot be undone. */
if (!err) {
PBMSResultRec result;
if (pbms_delete_table_with_blobs(table_path, &result)) {
xt_logf(XT_NT_WARNING, "pbms_delete_table_with_blobs() Error: %s", result.mr_message);
}
pbms_completed(NULL, true);
}
#endif
return err;
}
......@@ -4809,6 +4868,16 @@ int ha_pbxt::rename_table(const char *from, const char *to)
XT_PRINT2(self, "ha_pbxt::rename_table %s -> %s\n", from, to);
#ifdef PBMS_ENABLED
PBMSResultRec result;
err = pbms_rename_table_with_blobs(from, to, &result);
if (err) {
xt_logf(XT_NT_ERROR, "pbms_rename_table_with_blobs() Error: %s", result.mr_message);
return err;
}
#endif
try_(a) {
xt_ha_open_database_of_table(self, (XTPathStrPtr) to);
to_db = self->st_database;
......@@ -4837,10 +4906,6 @@ int ha_pbxt::rename_table(const char *from, const char *to)
freer_(); // ha_release_exclusive_use(share)
freer_(); // ha_unget_share(share)
#ifdef XT_STREAMING
/* PBMS remove the table? */
xt_pbms_rename_table(from, to);
#endif
/*
* If there are no more PBXT tables in the database, we
* "drop the database", which deletes all PBXT resources
......@@ -4860,6 +4925,10 @@ int ha_pbxt::rename_table(const char *from, const char *to)
err = xt_ha_pbxt_thread_error_for_mysql(thd, self, pb_ignore_dup_key);
}
cont_(a);
#ifdef PBMS_ENABLED
pbms_completed(NULL, (err == 0));
#endif
XT_RETURN(err);
}
......
......@@ -3424,7 +3424,7 @@ static void idx_set_index_selectivity(XTThreadPtr self, XTOpenTablePtr ot, XTInd
ot->ot_ind_rhandle = NULL;
failed:
ot->ot_table->tab_dic.dic_disable_index = XT_INDEX_CORRUPTED;
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
xt_log_and_clear_exception_ns();
return;
}
......
......@@ -52,9 +52,6 @@ extern pthread_key_t THR_Session;
#include "myxt_xt.h"
#include "strutil_xt.h"
#include "database_xt.h"
#ifdef XT_STREAMING
#include "streaming_xt.h"
#endif
#include "cache_xt.h"
#include "datalog_xt.h"
......@@ -2914,6 +2911,11 @@ xtPublic void myxt_static_convert_table_name(XTThreadPtr XT_UNUSED(self), char *
tablename_to_filename(from, to, to_len);
}
xtPublic void myxt_static_convert_file_name(char *from, char *to, size_t to_len)
{
filename_to_tablename(from, to, to_len);
}
xtPublic int myxt_strcasecmp(char * a, char *b)
{
return my_strcasecmp(&my_charset_utf8_general_ci, a, b);
......@@ -2945,88 +2947,6 @@ xtPublic MX_CHARSET_INFO *myxt_getcharset(bool convert)
return &my_charset_utf8_general_ci;
}
#ifdef XT_STREAMING
xtPublic xtBool myxt_use_blobs(XTOpenTablePtr ot, void **ret_pbms_table, xtWord1 *rec_buf)
{
void *pbms_table;
XTTable *tab = ot->ot_table;
u_int idx = 0;
Field *field;
char *blob_ref;
xtWord4 len;
char in_url[PBMS_BLOB_URL_SIZE];
char *out_url;
if (!xt_pbms_open_table(&pbms_table, tab->tab_name->ps_path))
return FAILED;
for (idx=0; idx<tab->tab_dic.dic_blob_count; idx++) {
field = tab->tab_dic.dic_blob_cols[idx];
if ((blob_ref = mx_get_length_and_data(field, (char *) rec_buf, &len)) && len) {
xt_strncpy(PBMS_BLOB_URL_SIZE, in_url, blob_ref, len);
if (!xt_pbms_use_blob(pbms_table, &out_url, in_url, field->field_index)) {
xt_pbms_close_table(pbms_table);
return FAILED;
}
if (out_url) {
len = strlen(out_url);
mx_set_length_and_data(field, (char *) rec_buf, len, out_url);
}
}
}
*ret_pbms_table = pbms_table;
return OK;
}
xtPublic void myxt_unuse_blobs(XTOpenTablePtr XT_UNUSED(ot), void *pbms_table)
{
xt_pbms_close_table(pbms_table);
}
xtPublic xtBool myxt_retain_blobs(XTOpenTablePtr XT_UNUSED(ot), void *pbms_table, xtRecordID rec_id)
{
xtBool ok;
PBMSEngineRefRec eng_ref;
memset(&eng_ref, 0, sizeof(PBMSEngineRefRec));
XT_SET_DISK_8(eng_ref.er_data, rec_id);
ok = xt_pbms_retain_blobs(pbms_table, &eng_ref);
xt_pbms_close_table(pbms_table);
return ok;
}
xtPublic void myxt_release_blobs(XTOpenTablePtr ot, xtWord1 *rec_buf, xtRecordID rec_id)
{
void *pbms_table;
XTTable *tab = ot->ot_table;
u_int idx = 0;
Field *field;
char *blob_ref;
xtWord4 len;
char in_url[PBMS_BLOB_URL_SIZE];
PBMSEngineRefRec eng_ref;
memset(&eng_ref, 0, sizeof(PBMSEngineRefRec));
XT_SET_DISK_8(eng_ref.er_data, rec_id);
if (!xt_pbms_open_table(&pbms_table, tab->tab_name->ps_path))
return;
for (idx=0; idx<tab->tab_dic.dic_blob_count; idx++) {
field = tab->tab_dic.dic_blob_cols[idx];
if ((blob_ref = mx_get_length_and_data(field, (char *) rec_buf, &len)) && len) {
xt_strncpy(PBMS_BLOB_URL_SIZE, in_url, blob_ref, len);
xt_pbms_release_blob(pbms_table, in_url, field->field_index, &eng_ref);
}
}
xt_pbms_close_table(pbms_table);
}
#endif // XT_STREAMING
xtPublic void *myxt_create_thread()
{
#ifdef DRIZZLED
......
......@@ -70,6 +70,7 @@ XTDDTable *myxt_create_table_from_table(XTThreadPtr self, STRUCT_TABLE *my_tab);
void myxt_static_convert_identifier(XTThreadPtr self, struct charset_info_st *cs, char *from, char *to, size_t to_len);
char *myxt_convert_identifier(XTThreadPtr self, struct charset_info_st *cs, char *from);
void myxt_static_convert_table_name(XTThreadPtr self, char *from, char *to, size_t to_len);
void myxt_static_convert_file_name(char *from, char *to, size_t to_len);
char *myxt_convert_table_name(XTThreadPtr self, char *from);
int myxt_strcasecmp(char * a, char *b);
int myxt_isspace(struct charset_info_st *cs, char a);
......@@ -78,13 +79,6 @@ int myxt_isdigit(struct charset_info_st *cs, char a);
struct charset_info_st *myxt_getcharset(bool convert);
#ifdef XT_STREAMING
xtBool myxt_use_blobs(XTOpenTablePtr ot, void **ret_pbms_table, xtWord1 *rec_buf);
void myxt_unuse_blobs(XTOpenTablePtr ot, void *pbms_table);
xtBool myxt_retain_blobs(XTOpenTablePtr ot, void *pbms_table, xtRecordID record);
void myxt_release_blobs(XTOpenTablePtr ot, xtWord1 *rec_buf, xtRecordID record);
#endif
void *myxt_create_thread();
void myxt_destroy_thread(void *thread, xtBool end_threads);
XTThreadPtr myxt_get_self();
......
......@@ -16,7 +16,8 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Paul McCullagh
* Original author: Paul McCullagh
* Continued development: Barry Leslie
* H&G2JCtL
*
* 2007-06-01
......@@ -37,21 +38,26 @@
#include <dirent.h>
#include <signal.h>
#include <ctype.h>
#include <errno.h>
#ifdef USE_PRAGMA_INTERFACE
#pragma interface /* gcc class implementation */
#endif
/* 2 10 1 10 20 10 10 20 20
* Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
*/
//If URL_FMT changes do not forget to update couldBeURL() in this file.
#define URL_FMT "~*%lu%c%lu-%llu-%lx-%lu-%llu-%llu"
#define MS_SHARED_MEMORY_MAGIC 0x7E9A120C
#define MS_ENGINE_VERSION 1
#define MS_CALLBACK_VERSION 1
#define MS_SHARED_MEMORY_VERSION 1
#define MS_ENGINE_LIST_SIZE 80
#define MS_CALLBACK_VERSION 4
#define MS_SHARED_MEMORY_VERSION 2
#define MS_ENGINE_LIST_SIZE 10
#define MS_TEMP_FILE_PREFIX "pbms_temp_"
#define MS_TEMP_FILE_PREFIX "pbms_temp_"
#define MS_RESULT_MESSAGE_SIZE 300
#define MS_RESULT_STACK_SIZE 200
#define MS_BLOB_HANDLE_SIZE 300
......@@ -68,146 +74,81 @@
#define MS_ERR_UNKNOWN_DB 8
#define MS_ERR_REMOVING_REPO 9
#define MS_ERR_DATABASE_DELETED 10
#define MS_ERR_DUPLICATE 11 /* Attempt to insert a duplicate key into a system table. */
#define MS_ERR_INVALID_RECORD 12
#define MS_ERR_RECOVERY_IN_PROGRESS 13
#define MS_ERR_DUPLICATE_DB 14
#define MS_ERR_DUPLICATE_DB_ID 15
#define MS_ERR_INVALID_OPERATION 16
#define MS_LOCK_NONE 0
#define MS_LOCK_READONLY 1
#define MS_LOCK_READ_WRITE 2
#define MS_XACT_NONE 0
#define MS_XACT_BEGIN 1
#define MS_XACT_COMMIT 2
#define MS_XACT_ROLLBACK 3
#define PBMS_ENGINE_REF_LEN 8
#define PBMS_BLOB_URL_SIZE 200
#define PBMS_BLOB_URL_SIZE 120
#define PBMS_FIELD_COL_SIZE 128
#define PBMS_FIELD_COND_SIZE 300
#define MS_RESULT_MESSAGE_SIZE 300
#define MS_RESULT_STACK_SIZE 200
typedef struct PBMSResultRec {
int mr_code; /* Engine specific error code. */
char mr_message[MS_RESULT_MESSAGE_SIZE]; /* Error message, required if non-zero return code. */
char mr_stack[MS_RESULT_STACK_SIZE]; /* Trace information about where the error occurred. */
} PBMSResultRec, *PBMSResultPtr;
typedef struct PBMSBlobID {
u_int32_t bi_db_id;
u_int64_t bi_blob_size;
u_int64_t bi_blob_id; // or repo file offset if type = REPO
u_int64_t bi_blob_ref_id;
u_int32_t bi_tab_id; // or repo ID if type = REPO
u_int32_t bi_auth_code;
u_int32_t bi_blob_type;
} PBMSBlobIDRec, *PBMSBlobIDPtr;
typedef struct PBMSResultRec {
int mr_code; /* Engine specific error code. */
char mr_message[MS_RESULT_MESSAGE_SIZE]; /* Error message, required if non-zero return code. */
char mr_stack[MS_RESULT_STACK_SIZE]; /* Trace information about where the error occurred. */
} PBMSResultRec, *PBMSResultPtr;
typedef struct PBMSEngineRefRec {
unsigned char er_data[PBMS_ENGINE_REF_LEN];
} PBMSEngineRefRec, *PBMSEngineRefPtr;
typedef struct PBMSBlobURL {
char bu_data[PBMS_BLOB_URL_SIZE];
} PBMSBlobURLRec, *PBMSBlobURLPtr;
typedef struct PBMSFieldRef {
char fr_column[PBMS_FIELD_COL_SIZE];
char fr_cond[PBMS_FIELD_COND_SIZE];
} PBMSFieldRefRec, *PBMSFieldRefPtr;
/*
* The engine must free its resources for the given thread.
*/
typedef void (*MSCloseConnFunc)(void *thd);
/* Before access BLOBs of a table, the streaming engine will open the table.
* Open tables are managed as a pool by the streaming engine.
* When a request is received, the streaming engine will ask all
* registered engine to open the table. The engine must return a NULL
* open_table pointer if it does not handle the table.
* A callback allows an engine to request all open tables to be
* closed by the streaming engine.
*/
typedef int (*MSOpenTableFunc)(void *thd, const char *table_url, void **open_table, PBMSResultPtr result);
typedef void (*MSCloseTableFunc)(void *thd, void *open_table);
/*
* When the streaming engine wants to use an open table handle from the
* pool, it calls the lock table function.
*/
typedef int (*MSLockTableFunc)(void *thd, int *xact, void *open_table, int lock_type, PBMSResultPtr result);
typedef int (*MSUnlockTableFunc)(void *thd, int xact, void *open_table, PBMSResultPtr result);
/* This function is used to locate and send a BLOB on the given stream.
*/
typedef int (*MSSendBLOBFunc)(void *thd, void *open_table, const char *blob_column, const char *blob_url, void *stream, PBMSResultPtr result);
/*
* Lookup and engine reference, and return readable text.
*/
typedef int (*MSLookupRefFunc)(void *thd, void *open_table, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSFieldRefPtr feild_ref, PBMSResultPtr result);
typedef struct PBMSEngineRec {
int ms_version; /* MS_ENGINE_VERSION */
int ms_index; /* The index into the engine list. */
int ms_removing; /* TRUE (1) if the engine is being removed. */
const char *ms_engine_name;
void *ms_engine_info;
MSCloseConnFunc ms_close_conn;
MSOpenTableFunc ms_open_table;
MSCloseTableFunc ms_close_table;
MSLockTableFunc ms_lock_table;
MSUnlockTableFunc ms_unlock_table;
MSSendBLOBFunc ms_send_blob;
MSLookupRefFunc ms_lookup_ref;
int ms_internal; /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
char ms_engine_name[32];
} PBMSEngineRec, *PBMSEnginePtr;
/*
* This function should never be called directly, it is called
* by deregisterEngine() below.
*/
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
typedef void (*ECTableCloseAllFunc)(const char *table_url);
typedef int (*ECSetContentLenFunc)(void *stream, off_t len, PBMSResultPtr result);
typedef int (*ECWriteHeadFunc)(void *stream, PBMSResultPtr result);
typedef int (*ECWriteStreamFunc)(void *stream, void *buffer, size_t len, PBMSResultPtr result);
/*
* The engine should call this function from
* its own close connection function!
*/
typedef int (*ECCloseConnFunc)(void *thd, PBMSResultPtr result);
typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
/*
* Call this function before retaining or releasing BLOBs in a row.
*/
typedef int (*ECOpenTableFunc)(void **open_table, char *table_path, PBMSResultPtr result);
typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
/*
* Call this function when the operation is complete.
* Call this function to store a BLOB in the repository the BLOB's
* URL will be returned. The returned URL buffer is expected to be atleast
* PBMS_BLOB_URL_SIZE long.
*
* The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
*/
typedef void (*ECCloseTableFunc)(void *open_table);
typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, char *blob_url, unsigned short col_index, PBMSResultPtr result);
/*
* Call this function for each BLOB to be retained. When a BLOB is used, the
* URL may be changed. The returned URL is valid as long as the the
* table is open.
* URL may be changed. The returned URL buffer is expected to be atleast
* PBMS_BLOB_URL_SIZE long.
*
* The returned URL must be inserted into the row in place of the given
* URL.
*/
typedef int (*ECUseBlobFunc)(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
/*
* Reference Blobs that has been uploaded to the streaming engine.
*
* All BLOBs specified by the use blob function are retained by
* this function.
*
* The engine reference is a (unaligned) 8 byte value which
* identifies the row that the BLOBs are in.
*/
typedef int (*ECRetainBlobsFunc)(void *open_table, PBMSEngineRefPtr eng_ref, PBMSResultPtr result);
typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
/*
* If a row containing a BLOB is deleted, then the BLOBs in the
......@@ -216,27 +157,24 @@ typedef int (*ECRetainBlobsFunc)(void *open_table, PBMSEngineRefPtr eng_ref, PBM
* Note: if a table is dropped, all the BLOBs referenced by the
* table are automatically released.
*/
typedef int (*ECReleaseBlobFunc)(void *open_table, char *blob_url, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSResultPtr result);
typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
typedef int (*ECDropTable)(const char *table_path, PBMSResultPtr result);
typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result);
typedef int (*ECRenameTable)(const char *from_table, const char *to_table, PBMSResultPtr result);
typedef void (*ECCallCompleted)(bool built_in, bool ok);
typedef struct PBMSCallbacksRec {
int cb_version; /* MS_CALLBACK_VERSION */
ECRegisterdFunc cb_register;
ECDeregisterdFunc cb_deregister;
ECTableCloseAllFunc cb_table_close_all;
ECSetContentLenFunc cb_set_cont_len;
ECWriteHeadFunc cb_write_head;
ECWriteStreamFunc cb_write_stream;
ECCloseConnFunc cb_close_conn;
ECOpenTableFunc cb_open_table;
ECCloseTableFunc cb_close_table;
ECUseBlobFunc cb_use_blob;
ECRetainBlobsFunc cb_retain_blobs;
ECCreateBlobsFunc cb_create_blob;
ECRetainBlobsFunc cb_retain_blob;
ECReleaseBlobFunc cb_release_blob;
ECDropTable cb_drop_table;
ECRenameTable cb_rename_table;
ECCallCompleted cb_completed;
} PBMSCallbacksRec, *PBMSCallbacksPtr;
typedef struct PBMSSharedMemoryRec {
......@@ -251,30 +189,61 @@ typedef struct PBMSSharedMemoryRec {
PBMSEnginePtr sm_engine_list[MS_ENGINE_LIST_SIZE];
} PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
#ifndef PBMS_API
#ifndef PBMS_CLIENT_API
Please define he value of PBMS_API
#endif
#else
#ifdef PBMS_API
class PBMS_API
{
private:
const char *temp_prefix[3];
bool built_in;
public:
PBMS_API(): sharedMemory(NULL) {
int i = 0;
temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
#ifdef MS_TEMP_FILE_PREFIX
temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
#endif
temp_prefix[i++] = NULL;
}
~PBMS_API() { }
/*
* This method is called by the PBMS engine during startup.
*/
int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
int err;
deleteTempFiles();
err = getSharedMemory(true, result);
if (!err)
sharedMemory->sm_callbacks = callbacks;
return err;
}
/*
* This method is called by the PBMS engine during startup.
*/
void PBMSShutdown() {
if (!sharedMemory)
return;
lock();
sharedMemory->sm_callbacks = NULL;
bool empty = true;
for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
if (sharedMemory->sm_engine_list[i])
empty = false;
}
unlock();
if (empty)
removeSharedMemory();
}
/*
* Register the engine with the Stream Engine.
*/
......@@ -283,6 +252,7 @@ public:
deleteTempFiles();
// The first engine to register creates the shared memory.
if ((err = getSharedMemory(true, result)))
return err;
......@@ -292,6 +262,10 @@ public:
engine->ms_index = i;
if (i >= sharedMemory->sm_list_len)
sharedMemory->sm_list_len = i+1;
if (sharedMemory->sm_callbacks)
sharedMemory->sm_callbacks->cb_register(engine);
built_in = (engine->ms_internal == 1);
return MS_OK;
}
}
......@@ -322,7 +296,7 @@ public:
PBMSResultRec result;
int err;
if ((err = getSharedMemory(true, &result)))
if ((err = getSharedMemory(false, &result)))
return;
lock();
......@@ -342,207 +316,98 @@ public:
unlock();
if (empty) {
char temp_file[100];
sharedMemory->sm_magic = 0;
free(sharedMemory);
sharedMemory = NULL;
const char **prefix = temp_prefix;
while (*prefix) {
getTempFileName(temp_file, *prefix, getpid());
unlink(temp_file);
prefix++;
}
}
if (empty)
removeSharedMemory();
}
void closeAllTables(const char *table_url)
void removeSharedMemory()
{
PBMSResultRec result;
int err;
if ((err = getSharedMemory(true, &result)))
return;
const char **prefix = temp_prefix;
char temp_file[100];
// Do not remove the sharfed memory until after
// the PBMS engine has shutdown.
if (sharedMemory->sm_callbacks)
sharedMemory->sm_callbacks->cb_table_close_all(table_url);
}
int setContentLength(void *stream, off_t len, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
return err;
return sharedMemory->sm_callbacks->cb_set_cont_len(stream, len, result);
}
int writeHead(void *stream, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
return err;
return sharedMemory->sm_callbacks->cb_write_head(stream, result);
}
int writeStream(void *stream, void *buffer, size_t len, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
return err;
return sharedMemory->sm_callbacks->cb_write_stream(stream, buffer, len, result);
}
int closeConn(void *thd, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
return err;
if (!sharedMemory->sm_callbacks)
return MS_OK;
return sharedMemory->sm_callbacks->cb_close_conn(thd, result);
}
int openTable(void **open_table, char *table_path, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
return err;
if (!sharedMemory->sm_callbacks) {
*open_table = NULL;
return MS_OK;
return;
sharedMemory->sm_magic = 0;
free(sharedMemory);
sharedMemory = NULL;
while (*prefix) {
getTempFileName(temp_file, *prefix, getpid());
unlink(temp_file);
prefix++;
}
return sharedMemory->sm_callbacks->cb_open_table(open_table, table_path, result);
}
int closeTable(void *open_table, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
return err;
if (sharedMemory->sm_callbacks && open_table)
sharedMemory->sm_callbacks->cb_close_table(open_table);
return MS_OK;
}
int couldBeURL(char *blob_url)
/* ~*test/~1-150-2b5e0a7-0[*<blob size>][.ext] */
/* ~*test/_1-150-2b5e0a7-0[*<blob size>][.ext] */
{
char *ptr;
size_t len;
bool have_blob_size = false;
if (blob_url) {
if ((len = strlen(blob_url))) {
/* Too short: */
if (len <= 10)
return 0;
/* Required prefix: */
/* NOTE: ~> is deprecated v0.5.4+, now use ~* */
if (*blob_url != '~' || (*(blob_url + 1) != '>' && *(blob_url + 1) != '*'))
return 0;
ptr = blob_url + len - 1;
/* Allow for an optional extension: */
if (!isdigit(*ptr)) {
while (ptr > blob_url && *ptr != '/' && *ptr != '.')
ptr--;
if (ptr == blob_url || *ptr != '.')
return 0;
if (ptr == blob_url || !isdigit(*ptr))
return 0;
}
// field 1: server id OR blob size
do_again:
while (ptr > blob_url && isdigit(*ptr))
ptr--;
if (ptr != blob_url && *ptr == '*' && !have_blob_size) {
ptr--;
have_blob_size = true;
goto do_again;
}
if (ptr == blob_url || *ptr != '-')
return 0;
// field 2: Authoration code
ptr--;
if (!isxdigit(*ptr))
return 0;
while (ptr > blob_url && isxdigit(*ptr))
ptr--;
if (ptr == blob_url || *ptr != '-')
return 0;
// field 3:offset
ptr--;
if (!isxdigit(*ptr))
return 0;
while (ptr > blob_url && isdigit(*ptr))
ptr--;
if (ptr == blob_url || *ptr != '-')
return 0;
// field 4:Table id
ptr--;
if (!isdigit(*ptr))
return 0;
while (ptr > blob_url && isdigit(*ptr))
ptr--;
/* NOTE: ^ and : are deprecated v0.5.4+, now use ! and ~ */
if (ptr == blob_url || (*ptr != '^' && *ptr != ':' && *ptr != '_' && *ptr != '~'))
return 0;
ptr--;
if (ptr == blob_url || *ptr != '/')
return 0;
ptr--;
if (ptr == blob_url)
return 0;
return 1;
int couldBeURL(char *blob_url, int size)
{
if (blob_url && (size < PBMS_BLOB_URL_SIZE)) {
char buffer[PBMS_BLOB_URL_SIZE+1];
u_int32_t db_id = 0;
u_int32_t tab_id = 0;
u_int64_t blob_id = 0;
u_int64_t blob_ref_id = 0;
u_int64_t blob_size = 0;
u_int32_t auth_code = 0;
u_int32_t server_id = 0;
char type, junk[5];
int scanned;
junk[0] = 0;
if (blob_url[size]) { // There is no guarantee that the URL will be null terminated.
memcpy(buffer, blob_url, size);
buffer[size] = 0;
blob_url = buffer;
}
scanned = sscanf(blob_url, URL_FMT"%4s", &db_id, &type, &tab_id, &blob_id, &auth_code, &server_id, &blob_ref_id, &blob_size, junk);
if (scanned != 8) {// If junk is found at the end this will also result in an invalid URL.
printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
return 0;
}
if (junk[0] || (type != '~' && type != '_')) {
printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]);
return 0;
}
return 1;
}
return 0;
}
int useBlob(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result)
int retainBlob(const char *db_name, const char *tab_name, char *ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
{
int err;
char safe_url[PBMS_BLOB_URL_SIZE+1];
if ((err = getSharedMemory(true, result)))
if ((err = getSharedMemory(false, result)))
return err;
if (!couldBeURL(blob_url)) {
*ret_blob_url = NULL;
return MS_OK;
if (!couldBeURL(blob_url, blob_size)) {
if (!sharedMemory->sm_callbacks) {
*ret_blob_url = 0;
return MS_OK;
}
err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, col_index, result);
if (err)
return err;
blob_url = ret_blob_url;
} else {
// Make sure the url is a C string:
if (blob_url[blob_size]) {
memcpy(safe_url, blob_url, blob_size);
safe_url[blob_size] = 0;
blob_url = safe_url;
}
}
if (!sharedMemory->sm_callbacks) {
result->mr_code = MS_ERR_INCORRECT_URL;
......@@ -551,64 +416,71 @@ public:
return MS_ERR_INCORRECT_URL;
}
return sharedMemory->sm_callbacks->cb_use_blob(open_table, ret_blob_url, blob_url, col_index, result);
return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
}
int retainBlobs(void *open_table, PBMSEngineRefPtr eng_ref, PBMSResultPtr result)
int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
{
int err;
char safe_url[PBMS_BLOB_URL_SIZE+1];
if ((err = getSharedMemory(true, result)))
if ((err = getSharedMemory(false, result)))
return err;
if (!sharedMemory->sm_callbacks)
return MS_OK;
return sharedMemory->sm_callbacks->cb_retain_blobs(open_table, eng_ref, result);
if (!couldBeURL(blob_url, blob_size))
return MS_OK;
if (blob_url[blob_size]) {
memcpy(safe_url, blob_url, blob_size);
safe_url[blob_size] = 0;
blob_url = safe_url;
}
return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
}
int releaseBlob(void *open_table, char *blob_url, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSResultPtr result)
int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
if ((err = getSharedMemory(false, result)))
return err;
if (!sharedMemory->sm_callbacks)
return MS_OK;
if (!couldBeURL(blob_url))
return MS_OK;
return sharedMemory->sm_callbacks->cb_release_blob(open_table, blob_url, col_index, eng_ref, result);
return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
}
int dropTable(const char *table_path, PBMSResultPtr result)
int renameTable(const char *db_name, const char *from_table, const char *to_table, PBMSResultPtr result)
{
int err;
if ((err = getSharedMemory(true, result)))
if ((err = getSharedMemory(false, result)))
return err;
if (!sharedMemory->sm_callbacks)
return MS_OK;
return sharedMemory->sm_callbacks->cb_drop_table(table_path, result);
return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_table, result);
}
int renameTable(const char *from_table, const char *to_table, PBMSResultPtr result)
void completed(int ok)
{
int err;
PBMSResultRec result;
if ((err = getSharedMemory(true, result)))
return err;
if (getSharedMemory(false, &result))
return;
if (!sharedMemory->sm_callbacks)
return MS_OK;
return;
return sharedMemory->sm_callbacks->cb_rename_table(from_table, to_table, result);
sharedMemory->sm_callbacks->cb_completed(built_in, ok);
}
volatile PBMSSharedMemoryPtr sharedMemory;
private:
......@@ -618,7 +490,6 @@ private:
int r;
char temp_file[100];
const char **prefix = temp_prefix;
void *tmp_p = NULL;
if (sharedMemory)
return MS_OK;
......@@ -644,8 +515,7 @@ private:
}
buffer[tfer] = 0;
sscanf(buffer, "%p", &tmp_p);
sharedMemory = (PBMSSharedMemoryPtr) tmp_p;
sscanf(buffer, "%p", &sharedMemory);
if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
if (!create)
return MS_OK;
......@@ -661,9 +531,9 @@ private:
return setOSResult(errno, "fseek", temp_file, result);
}
sprintf(buffer, "%p", (void *) sharedMemory);
sprintf(buffer, "%p", sharedMemory);
tfer = write(tmp_f, buffer, strlen(buffer));
if (tfer != (ssize_t) strlen(buffer)) {
if (tfer != strlen(buffer)) {
close(tmp_f);
return setOSResult(errno, "write", temp_file, result);
}
......@@ -847,25 +717,25 @@ extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
/*
* PBMSCreateBlob():Creates a new blob in the database of the given size. cont_type can be NULL.
* PBMSCreateBlob():Creates a new blob in the database of the given size.
*/
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, char *cont_type, u_int64_t size);
extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
/*
* PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of
* data written to the blob must match the size specified when the blob was created.
*/
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *database_name, char *data, size_t size, size_t offset);
extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
/*
* PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
*/
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *database_name, char *buffer, size_t *size, size_t offset);
extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
/*
* PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast PBMS_BLOB_URL_SIZE bytes in size.
*/
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *database_name, char *url);
extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
/*
* PBMSIDToURL():Convert a blob URL to a blob ID.
......
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
*
* PrimeBase Media Stream for MySQL
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Barry Leslie
*
* 2009-07-16
*
* H&G2JCtL
*
* PBMS interface used to enable engines for use with the PBMS engine.
*
* For an example on how to build this into an engine have a look at the PBXT engine
* in file ha_pbxt.cc. Search for 'PBMS_ENABLED'.
*
*/
#define PBMS_API pbms_enabled_api
#include "pbms_enabled.h"
#ifdef DRIZZLED
#include <sys/stat.h>
#include <drizzled/common_includes.h>
#include <drizzled/plugin.h>
#else
#include "mysql_priv.h"
#include <mysql/plugin.h>
#define session_alloc(sess, size) thd_alloc(sess, size);
#define current_session current_thd
#endif
#define GET_BLOB_FIELD(t, i) (Field_blob *)(t->field[t->s->blob_field[i]])
#define DB_NAME(f) (f->table->s->db.str)
#define TAB_NAME(f) (*(f->table_name))
static PBMS_API pbms_api;
PBMSEngineRec enabled_engine = {
MS_ENGINE_VERSION
};
//====================
bool pbms_initialize(const char *engine_name, bool isServer, PBMSResultPtr result)
{
int err;
strncpy(enabled_engine.ms_engine_name, engine_name, 32);
enabled_engine.ms_internal = isServer;
enabled_engine.ms_engine_name[31] = 0;
err = pbms_api.registerEngine(&enabled_engine, result);
return (err == 0);
}
//====================
void pbms_finalize()
{
pbms_api.deregisterEngine(&enabled_engine);
}
//====================
int pbms_write_row_blobs(TABLE *table, uchar *row_buffer, PBMSResultPtr result)
{
Field_blob *field;
char *blob_rec, *blob;
size_t packlength, i, org_length, length;
char blob_url_buffer[PBMS_BLOB_URL_SIZE];
int err;
String type_name;
if (table->s->blob_fields == 0)
return 0;
for (i= 0; i < table->s->blob_fields; i++) {
field = GET_BLOB_FIELD(table, i);
// Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
field->sql_type(type_name);
if (strcasecmp(type_name.c_ptr(), "LongBlob"))
continue;
// Get the blob record:
blob_rec = (char *)row_buffer + field->offset(field->table->record[0]);
packlength = field->pack_length() - field->table->s->blob_ptr_size;
memcpy(&blob, blob_rec +packlength, sizeof(char*));
org_length = field->get_length((uchar *)blob_rec);
// Signal PBMS to record a new reference to the BLOB.
// If 'blob' is not a BLOB URL then it will be stored in the repositor as a new BLOB
// and a reference to it will be created.
err = pbms_api.retainBlob(DB_NAME(field), TAB_NAME(field), blob_url_buffer, blob, org_length, field->field_index, result);
if (err)
return err;
// If the BLOB length changed reset it.
// This will happen if the BLOB data was replaced with a BLOB reference.
length = strlen(blob_url_buffer) +1;
if ((length != org_length) || memcmp(blob_url_buffer, blob, length)) {
if (length != org_length) {
field->store_length((uchar *)blob_rec, packlength, length);
}
if (length > org_length) {
// This can only happen if the BLOB URL is actually larger than the BLOB itself.
blob = (char *) session_alloc(current_session, length);
memcpy(blob_rec+packlength, &blob, sizeof(char*));
}
memcpy(blob, blob_url_buffer, length);
}
}
return 0;
}
//====================
int pbms_delete_row_blobs(TABLE *table, const uchar *row_buffer, PBMSResultPtr result)
{
Field_blob *field;
const char *blob_rec;
char *blob;
size_t packlength, i, length;
int err;
String type_name;
if (table->s->blob_fields == 0)
return 0;
for (i= 0; i < table->s->blob_fields; i++) {
field = GET_BLOB_FIELD(table, i);
// Note: field->type() always returns MYSQL_TYPE_BLOB regardless of the type of BLOB
field->sql_type(type_name);
if (strcasecmp(type_name.c_ptr(), "LongBlob"))
continue;
// Get the blob record:
blob_rec = (char *)row_buffer + field->offset(field->table->record[0]);
packlength = field->pack_length() - field->table->s->blob_ptr_size;
length = field->get_length((uchar *)blob_rec);
memcpy(&blob, blob_rec +packlength, sizeof(char*));
// Signal PBMS to delete the reference to the BLOB.
err = pbms_api.releaseBlob(DB_NAME(field), TAB_NAME(field), blob, length, result);
if (err)
return err;
}
return 0;
}
#define MAX_NAME_SIZE 64
static void parse_table_path(const char *path, char *db_name, char *tab_name)
{
const char *ptr = path + strlen(path) -1, *eptr;
int len;
*db_name = *tab_name = 0;
while ((ptr > path) && (*ptr != '/'))ptr --;
if (*ptr != '/')
return;
strncpy(tab_name, ptr+1, MAX_NAME_SIZE);
tab_name[MAX_NAME_SIZE-1] = 0;
eptr = ptr;
ptr--;
while ((ptr > path) && (*ptr != '/'))ptr --;
if (*ptr != '/')
return;
ptr++;
len = eptr - ptr;
if (len >= MAX_NAME_SIZE)
len = MAX_NAME_SIZE-1;
memcpy(db_name, ptr, len);
db_name[len] = 0;
}
//====================
int pbms_rename_table_with_blobs(const char *old_table_path, const char *new_table_path, PBMSResultPtr result)
{
char o_db_name[MAX_NAME_SIZE], n_db_name[MAX_NAME_SIZE], o_tab_name[MAX_NAME_SIZE], n_tab_name[MAX_NAME_SIZE];
parse_table_path(old_table_path, o_db_name, o_tab_name);
parse_table_path(new_table_path, n_db_name, n_tab_name);
if (strcmp(o_db_name, n_db_name)) {
result->mr_code = MS_ERR_INVALID_OPERATION;
strcpy(result->mr_message, "PBMS does not support renaming tables across databases.");
strcpy(result->mr_stack, "pbms_rename_table_with_blobs()");
return MS_ERR_INVALID_OPERATION;
}
return pbms_api.renameTable(o_db_name, o_tab_name, n_tab_name, result);
}
//====================
int pbms_delete_table_with_blobs(const char *table_path, PBMSResultPtr result)
{
char db_name[MAX_NAME_SIZE], tab_name[MAX_NAME_SIZE];
parse_table_path(table_path, db_name, tab_name);
return pbms_api.dropTable(db_name, tab_name, result);
}
//====================
void pbms_completed(TABLE *table, bool ok)
{
if ((!table) || (table->s->blob_fields != 0))
pbms_api.completed(ok) ;
return ;
}
/* Copyright (c) 2009 PrimeBase Technologies GmbH, Germany
*
* PrimeBase Media Stream for MySQL
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* Barry Leslie
*
* 2009-07-16
*
* H&G2JCtL
*
* PBMS interface used to enable engines for use with the PBMS engine.
*
* For an example on how to build this into an engine have a look at the PBXT engine
* in file ha_pbxt.cc. Search for 'PBMS_ENABLED'.
*
*/
#ifndef __PBMS_ENABLED_H__
#define __PBMS_ENABLED_H__
#include "pbms.h"
#ifdef DRIZZLED
#include <drizzled/server_includes.h>
#define TABLE Table
#else
#include <mysql_priv.h>
#endif
/*
* pbms_initialize() should be called from the engines plugIn's 'init()' function.
* The engine_name is the name of your engine, "PBXT" or "InnoDB" for example.
*
* The isServer flag indicates if this entire server is being enabled. This is only
* true if this is being built into the server's handler code above the engine level
* calls.
*/
extern bool pbms_initialize(const char *engine_name, bool isServer, PBMSResultPtr result);
/*
* pbms_finalize() should be called from the engines plugIn's 'deinit()' function.
*/
extern void pbms_finalize();
/*
* pbms_write_row_blobs() should be called from the engine's 'write_row' function.
* It can alter the row data so it must be called before any other function using the row data.
* It should also be called from engine's 'update_row' function for the new row.
*
* pbms_completed() must be called after calling pbms_write_row_blobs() and just before
* returning from write_row() to indicate if the operation completed successfully.
*/
extern int pbms_write_row_blobs(TABLE *table, uchar *buf, PBMSResultPtr result);
/*
* pbms_delete_row_blobs() should be called from the engine's 'delete_row' function.
* It should also be called from engine's 'update_row' function for the old row.
*
* pbms_completed() must be called after calling pbms_delete_row_blobs() and just before
* returning from delete_row() to indicate if the operation completed successfully.
*/
extern int pbms_delete_row_blobs(TABLE *table, const uchar *buf, PBMSResultPtr result);
/*
* pbms_rename_table_with_blobs() should be called from the engine's 'rename_table' function.
*
* NOTE: Renaming tables across databases is not supported.
*
* pbms_completed() must be called after calling pbms_rename_table_with_blobs() and just before
* returning from rename_table() to indicate if the operation completed successfully.
*/
extern int pbms_rename_table_with_blobs(const char *old_table_path, const char *new_table_path, PBMSResultPtr result);
/*
* pbms_delete_table_with_blobs() should be called from the engine's 'delete_table' function.
*
* NOTE: Currently pbms_delete_table_with_blobs() cannot be undone so it should only
* be called after the host engine has performed successfully drop it's table.
*
* pbms_completed() must be called after calling pbms_delete_table_with_blobs() and just before
* returning from delete_table() to indicate if the operation completed successfully.
*/
extern int pbms_delete_table_with_blobs(const char *table_path, PBMSResultPtr result);
/*
* pbms_completed() must be called to indicate success or failure of a an operation after having
* called pbms_write_row_blobs(), pbms_delete_row_blobs(), pbms_rename_table_with_blobs(), or
* pbms_delete_table_with_blobs().
*
* pbms_completed() has the effect of committing or rolling back the changes made if the session
* is in 'autocommit' mode.
*/
extern void pbms_completed(TABLE *table, bool ok);
#endif
......@@ -395,20 +395,31 @@ int xt_p_cond_timedwait(xt_cond_type *cond, xt_mutex_type *mt, struct timespec *
int xt_p_join(pthread_t thread, void **value)
{
switch (WaitForSingleObject(thread, INFINITE)) {
case WAIT_OBJECT_0:
case WAIT_TIMEOUT:
/* Don't do this! According to the Win docs:
* _endthread automatically closes the thread handle
* (whereas _endthreadex does not). Therefore, when using
* _beginthread and _endthread, do not explicitly close the
* thread handle by calling the Win32 CloseHandle API.
CloseHandle(thread);
*/
break;
case WAIT_FAILED:
return GetLastError();
DWORD exitcode;
while(1) {
switch (WaitForSingleObject(thread, 10000)) {
case WAIT_OBJECT_0:
return 0;
case WAIT_TIMEOUT:
/* Don't do this! According to the Win docs:
* _endthread automatically closes the thread handle
* (whereas _endthreadex does not). Therefore, when using
* _beginthread and _endthread, do not explicitly close the
* thread handle by calling the Win32 CloseHandle API.
CloseHandle(thread);
*/
/* This is done so that if the thread was not [yet] in the running
* state when this function was called we won't deadlock here.
*/
if (GetExitCodeThread(thread, &exitcode) && (exitcode == STILL_ACTIVE))
break;
return 0;
case WAIT_FAILED:
return GetLastError();
}
}
return 0;
}
......
......@@ -496,7 +496,7 @@ static xtBool xres_add_index_entries(XTOpenTablePtr ot, xtRowID row_id, xtRecord
/* TODO: Write something to the index header to indicate that
* it is corrupted.
*/
tab->tab_dic.dic_disable_index = XT_INDEX_CORRUPTED;
xt_tab_disable_index(ot->ot_table, XT_INDEX_CORRUPTED);
xt_log_and_clear_exception_ns();
return OK;
}
......@@ -629,6 +629,9 @@ static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBuff
xtWord1 *rec_data = NULL;
XTTabRecFreeDPtr free_data;
if (tab->tab_dic.dic_key_count == 0)
check_index = FALSE;
switch (record->xl.xl_status_1) {
case XT_LOG_ENT_REC_MODIFIED:
case XT_LOG_ENT_UPDATE:
......@@ -642,7 +645,7 @@ static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBuff
/* This should be done before we apply change to table, as otherwise we lose
* the key value that we need to remove from index
*/
if (check_index && ot->ot_table->tab_dic.dic_key_count && record->xl.xl_status_1 == XT_LOG_ENT_REC_MODIFIED) {
if (check_index && record->xl.xl_status_1 == XT_LOG_ENT_REC_MODIFIED) {
if ((rec_data = xres_load_record(self, ot, rec_id, NULL, 0, rec_buf, tab->tab_dic.dic_ind_cols_req)))
xres_remove_index_entries(ot, rec_id, rec_data);
}
......@@ -652,7 +655,7 @@ static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBuff
xt_throw(self);
tab->tab_bytes_to_flush += len;
if (check_index && ot->ot_table->tab_dic.dic_key_count) {
if (check_index) {
switch (record->xl.xl_status_1) {
case XT_LOG_ENT_DELETE:
case XT_LOG_ENT_DELETE_BG:
......@@ -851,9 +854,6 @@ static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBuff
goto do_rec_freed;
record_loaded = TRUE;
}
#ifdef XT_STREAMING
myxt_release_blobs(ot, rec_data, rec_id);
#endif
}
if (record->xl.xl_status_1 == XT_LOG_ENT_REC_REMOVED_EXT) {
......@@ -959,31 +959,12 @@ static void xres_apply_change(XTThreadPtr self, XTOpenTablePtr ot, XTXactLogBuff
if (check_index) {
cols_required = tab->tab_dic.dic_ind_cols_req;
#ifdef XT_STREAMING
if (tab->tab_dic.dic_blob_cols_req > cols_required)
cols_required = tab->tab_dic.dic_blob_cols_req;
#endif
if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
goto go_on_to_free;
record_loaded = TRUE;
xres_remove_index_entries(ot, rec_id, rec_data);
}
#ifdef XT_STREAMING
if (tab->tab_dic.dic_blob_count) {
if (!record_loaded) {
cols_required = tab->tab_dic.dic_blob_cols_req;
if (!(rec_data = xres_load_record(self, ot, rec_id, &record->rb.rb_rec_type_1, rec_size, rec_buf, cols_required)))
/* [(7)] REMOVE is followed by FREE:
goto get_rec_offset;
*/
goto go_on_to_free;
record_loaded = TRUE;
}
myxt_release_blobs(ot, rec_data, rec_id);
}
#endif
if (data_log_id && data_log_offset && log_over_size) {
if (!ot->ot_thread->st_dlog_buf.dlb_delete_log(data_log_id, data_log_offset, log_over_size, tab->tab_id, rec_id, self)) {
if (ot->ot_thread->t_exception.e_xt_err != XT_ERR_BAD_EXT_RECORD &&
......
/* Copyright (c) 2005 PrimeBase Technologies GmbH
*
* PrimeBase XT
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* 2006-06-07 Paul McCullagh
*
* H&G2JCtL
*
* This file contains PBXT streaming interface.
*/
#include "xt_config.h"
#ifdef XT_STREAMING
#include "ha_pbxt.h"
#include "thread_xt.h"
#include "strutil_xt.h"
#include "table_xt.h"
#include "myxt_xt.h"
#include "xaction_xt.h"
#include "database_xt.h"
#include "streaming_xt.h"
extern PBMSEngineRec pbxt_engine;
static PBMS_API pbxt_streaming;
/* ----------------------------------------------------------------------
* INIT & EXIT
*/
xtPublic xtBool xt_init_streaming(void)
{
XTThreadPtr self = NULL;
int err;
PBMSResultRec result;
if ((err = pbxt_streaming.registerEngine(&pbxt_engine, &result))) {
xt_logf(XT_CONTEXT, XT_LOG_ERROR, "%s\n", result.mr_message);
return FAILED;
}
return OK;
}
xtPublic void xt_exit_streaming(void)
{
pbxt_streaming.deregisterEngine(&pbxt_engine);
}
/* ----------------------------------------------------------------------
* UTILITY FUNCTIONS
*/
static void str_result_to_exception(XTExceptionPtr e, int r, PBMSResultPtr result)
{
char *str, *end_str;
e->e_xt_err = r;
e->e_sys_err = result->mr_code;
xt_strcpy(XT_ERR_MSG_SIZE, e->e_err_msg, result->mr_message);
e->e_source_line = 0;
str = result->mr_stack;
if ((end_str = strchr(str, '('))) {
xt_strcpy_term(XT_MAX_FUNC_NAME_SIZE, e->e_func_name, str, '(');
str = end_str+1;
if ((end_str = strchr(str, ':'))) {
xt_strcpy_term(XT_SOURCE_FILE_NAME_SIZE, e->e_source_file, str, ':');
str = end_str+1;
if ((end_str = strchr(str, ')'))) {
char number[40];
xt_strcpy_term(40, number, str, ')');
e->e_source_line = atol(number);
str = end_str+1;
if (*str == '\n')
str++;
}
}
}
if (e->e_source_line == 0) {
*e->e_func_name = 0;
*e->e_source_file = 0;
xt_strcpy(XT_ERR_MSG_SIZE, e->e_catch_trace, result->mr_stack);
}
else
xt_strcpy(XT_ERR_MSG_SIZE, e->e_catch_trace, str);
}
static void str_exception_to_result(XTExceptionPtr e, PBMSResultPtr result)
{
int len;
if (e->e_sys_err)
result->mr_code = e->e_sys_err;
else
result->mr_code = e->e_xt_err;
xt_strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, e->e_err_msg);
xt_strcpy(MS_RESULT_STACK_SIZE, result->mr_stack, e->e_func_name);
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, "(");
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, e->e_source_file);
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, ":");
xt_strcati(MS_RESULT_STACK_SIZE, result->mr_stack, (int) e->e_source_line);
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, ")");
len = strlen(result->mr_stack);
if (strncmp(result->mr_stack, e->e_catch_trace, len) == 0)
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, e->e_catch_trace + len);
else {
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, "\n");
xt_strcat(MS_RESULT_STACK_SIZE, result->mr_stack, e->e_catch_trace);
}
}
static XTIndexPtr str_find_index(XTTableHPtr tab, u_int *col_list, u_int col_cnt)
{
u_int i, j;
XTIndexPtr *ind; /* MySQL/PBXT key description */
ind = tab->tab_dic.dic_keys;
for (i=0; i<tab->tab_dic.dic_key_count; i++) {
if ((*ind)->mi_seg_count == col_cnt) {
for (j=0; j<(*ind)->mi_seg_count; j++) {
if ((*ind)->mi_seg[j].col_idx != col_list[j])
goto loop;
}
return *ind;
}
loop:
ind++;
}
return NULL;
}
static XTThreadPtr str_set_current_thread(THD *thd, PBMSResultPtr result)
{
XTThreadPtr self;
XTExceptionRec e;
if (!(self = xt_ha_set_current_thread(thd, &e))) {
str_exception_to_result(&e, result);
return NULL;
}
return self;
}
/* ----------------------------------------------------------------------
* BLOB STREAMING INTERFACE
*/
static void pbxt_close_conn(void *thread)
{
xt_ha_close_connection((THD *) thread);
}
static int pbxt_open_table(void *thread, const char *table_url, void **open_table, PBMSResultPtr result)
{
THD *thd = (THD *) thread;
XTThreadPtr self;
XTTableHPtr tab = NULL;
XTOpenTablePtr ot = NULL;
int err = MS_OK;
if (!(self = str_set_current_thread(thd, result)))
return MS_ERR_ENGINE;
try_(a) {
xt_ha_open_database_of_table(self, (XTPathStrPtr) table_url);
if (!(tab = xt_use_table(self, (XTPathStrPtr) table_url, FALSE, TRUE, NULL))) {
err = MS_ERR_UNKNOWN_TABLE;
goto done;
}
if (!(ot = xt_open_table(tab)))
throw_();
ot->ot_thread = self;
done:;
}
catch_(a) {
str_exception_to_result(&self->t_exception, result);
err = MS_ERR_ENGINE;
}
cont_(a);
if (tab)
xt_heap_release(self, tab);
*open_table = ot;
return err;
}
static void pbxt_close_table(void *thread, void *open_table_ptr)
{
THD *thd = (THD *) thread;
volatile XTThreadPtr self, new_self = NULL;
XTOpenTablePtr ot = (XTOpenTablePtr) open_table_ptr;
XTExceptionRec e;
if (thd) {
if (!(self = xt_ha_set_current_thread(thd, &e))) {
xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
return;
}
}
else if (!(self = xt_get_self())) {
if (!(new_self = xt_create_thread("TempForClose", FALSE, TRUE, &e))) {
xt_log_exception(NULL, &e, XT_LOG_DEFAULT);
return;
}
self = new_self;
}
ot->ot_thread = self;
try_(a) {
xt_close_table(ot, TRUE, FALSE);
}
catch_(a) {
xt_log_and_clear_exception(self);
}
cont_(a);
if (new_self)
xt_free_thread(self);
}
static int pbxt_lock_table(void *thread, int *xact, void *open_table, int lock_type, PBMSResultPtr result)
{
THD *thd = (THD *) thread;
XTThreadPtr self;
XTOpenTablePtr ot = (XTOpenTablePtr) open_table;
int err = MS_OK;
if (!(self = str_set_current_thread(thd, result)))
return MS_ERR_ENGINE;
if (lock_type != MS_LOCK_NONE) {
try_(a) {
xt_ha_open_database_of_table(self, ot->ot_table->tab_name);
ot->ot_thread = self;
}
catch_(a) {
str_exception_to_result(&self->t_exception, result);
err = MS_ERR_ENGINE;
}
cont_(a);
}
if (!err && *xact == MS_XACT_BEGIN) {
if (self->st_xact_data)
*xact = MS_XACT_NONE;
else {
if (xt_xn_begin(self)) {
*xact = MS_XACT_COMMIT;
}
else {
str_exception_to_result(&self->t_exception, result);
err = MS_ERR_ENGINE;
}
}
}
return err;
}
static int pbxt_unlock_table(void *thread, int xact, void *XT_UNUSED(open_table), PBMSResultPtr result)
{
THD *thd = (THD *) thread;
XTThreadPtr self = xt_ha_thd_to_self(thd);
int err = MS_OK;
if (xact == MS_XACT_COMMIT) {
if (!xt_xn_commit(self)) {
str_exception_to_result(&self->t_exception, result);
err = MS_ERR_ENGINE;
}
}
else if (xact == MS_XACT_ROLLBACK) {
xt_xn_rollback(self);
}
return err;
}
static int pbxt_send_blob(void *thread, void *open_table, const char *blob_column, const char *blob_url_p, void *stream, PBMSResultPtr result)
{
THD *thd = (THD *) thread;
XTThreadPtr self = xt_ha_thd_to_self(thd);
XTOpenTablePtr ot = (XTOpenTablePtr) open_table;
int err = MS_OK;
u_int blob_col_idx, col_idx;
char col_name[XT_IDENTIFIER_NAME_SIZE];
XTStringBufferRec value;
u_int col_list[XT_MAX_COLS_PER_INDEX];
u_int col_cnt;
char col_names[XT_ERR_MSG_SIZE - 200];
XTIdxSearchKeyRec search_key;
XTIndexPtr ind;
char *blob_data;
size_t blob_len;
const char *blob_url = blob_url_p;
memset(&value, 0, sizeof(value));
*col_names = 0;
ot->ot_thread = self;
try_(a) {
if (ot->ot_row_wbuf_size < ot->ot_table->tab_dic.dic_mysql_buf_size) {
xt_realloc(self, (void **) &ot->ot_row_wbuffer, ot->ot_table->tab_dic.dic_mysql_buf_size);
ot->ot_row_wbuf_size = ot->ot_table->tab_dic.dic_mysql_buf_size;
}
xt_strcpy_url(XT_IDENTIFIER_NAME_SIZE, col_name, blob_column);
if (!myxt_find_column(ot, &blob_col_idx, col_name))
xt_throw_tabcolerr(XT_CONTEXT, XT_ERR_COLUMN_NOT_FOUND, ot->ot_table->tab_name, blob_column);
/* Prepare a row for the condition: */
const char *ptr;
col_cnt = 0;
while (*blob_url) {
ptr = xt_strchr(blob_url, '=');
xt_strncpy_url(XT_IDENTIFIER_NAME_SIZE, col_name, blob_url, (size_t) (ptr - blob_url));
if (!myxt_find_column(ot, &col_idx, col_name))
xt_throw_tabcolerr(XT_CONTEXT, XT_ERR_COLUMN_NOT_FOUND, ot->ot_table->tab_name, col_name);
if (*col_names)
xt_strcat(sizeof(col_names), col_names, ", ");
xt_strcat(sizeof(col_names), col_names, col_name);
blob_url = ptr;
if (*blob_url == '=')
blob_url++;
ptr = xt_strchr(blob_url, '&');
value.sb_len = 0;
xt_sb_concat_url_len(self, &value, blob_url, (size_t) (ptr - blob_url));
blob_url = ptr;
if (*blob_url == '&')
blob_url++;
if (!myxt_set_column(ot, (char *) ot->ot_row_rbuffer, col_idx, value.sb_cstring, value.sb_len))
xt_throw_tabcolerr(XT_CONTEXT, XT_ERR_CONVERSION, ot->ot_table->tab_name, col_name);
if (col_cnt < XT_MAX_COLS_PER_INDEX) {
col_list[col_cnt] = col_idx;
col_cnt++;
}
}
/* Find a matching index: */
if (!(ind = str_find_index(ot->ot_table, col_list, col_cnt)))
xt_throw_ixterr(XT_CONTEXT, XT_ERR_NO_MATCHING_INDEX, col_names);
search_key.sk_key_value.sv_flags = 0;
search_key.sk_key_value.sv_rec_id = 0;
search_key.sk_key_value.sv_row_id = 0;
search_key.sk_key_value.sv_key = search_key.sk_key_buf;
search_key.sk_key_value.sv_length = myxt_create_key_from_row(ind, search_key.sk_key_buf, ot->ot_row_rbuffer, NULL);
search_key.sk_on_key = FALSE;
if (!xt_idx_search(ot, ind, &search_key))
xt_throw(self);
if (!ot->ot_curr_rec_id)
xt_throw_taberr(XT_CONTEXT, XT_ERR_NO_ROWS, ot->ot_table->tab_name);
while (ot->ot_curr_rec_id) {
if (!search_key.sk_on_key)
xt_throw_taberr(XT_CONTEXT, XT_ERR_NO_ROWS, ot->ot_table->tab_name);
retry:
/* X TODO - Check if the write buffer is big enough here! */
switch (xt_tab_read_record(ot, ot->ot_row_wbuffer)) {
case FALSE:
if (xt_idx_next(ot, ind, &search_key))
break;
case XT_ERR:
xt_throw(self);
case XT_NEW:
if (xt_idx_match_search(ot, ind, &search_key, ot->ot_row_wbuffer, XT_S_MODE_MATCH))
goto success;
if (!xt_idx_next(ot, ind, &search_key))
xt_throw(self);
break;
case XT_RETRY:
goto retry;
default:
goto success;
}
}
success:
myxt_get_column_data(ot, (char *) ot->ot_row_wbuffer, blob_col_idx, &blob_data, &blob_len);
/*
* Write the content length, then write the HTTP
* header, and then the content.
*/
err = pbxt_streaming.setContentLength(stream, blob_len, result);
if (!err)
err = pbxt_streaming.writeHead(stream, result);
if (!err)
err = pbxt_streaming.writeStream(stream, (void *) blob_data, blob_len, result);
}
catch_(a) {
str_exception_to_result(&self->t_exception, result);
if (result->mr_code == XT_ERR_NO_ROWS)
err = MS_ERR_NOT_FOUND;
else
err = MS_ERR_ENGINE;
}
cont_(a);
if (ot->ot_ind_rhandle) {
xt_ind_release_handle(ot->ot_ind_rhandle, FALSE, self);
ot->ot_ind_rhandle = NULL;
}
xt_sb_set_size(NULL, &value, 0);
return err;
}
int pbxt_lookup_ref(void *thread, void *open_table, unsigned short col_index, PBMSEngineRefPtr eng_ref, PBMSFieldRefPtr field_ref, PBMSResultPtr result)
{
THD *thd = (THD *) thread;
XTThreadPtr self = xt_ha_thd_to_self(thd);
XTOpenTablePtr ot = (XTOpenTablePtr) open_table;
int err = MS_OK;
u_int i, len;
char *data;
XTIndexPtr ind = NULL;
ot->ot_thread = self;
if (ot->ot_row_wbuf_size < ot->ot_table->tab_dic.dic_mysql_buf_size) {
xt_realloc(self, (void **) &ot->ot_row_wbuffer, ot->ot_table->tab_dic.dic_mysql_buf_size);
ot->ot_row_wbuf_size = ot->ot_table->tab_dic.dic_mysql_buf_size;
}
ot->ot_curr_rec_id = (xtRecordID) XT_GET_DISK_8(eng_ref->er_data);
switch (xt_tab_dirty_read_record(ot, ot->ot_row_wbuffer)) {
case FALSE:
err = MS_ERR_ENGINE;
break;
default:
break;
}
if (err) {
str_exception_to_result(&self->t_exception, result);
goto exit;
}
myxt_get_column_name(ot, col_index, PBMS_FIELD_COL_SIZE, field_ref->fr_column);
for (i=0; i<ot->ot_table->tab_dic.dic_key_count; i++) {
ind = ot->ot_table->tab_dic.dic_keys[i];
if (ind->mi_flags & (HA_UNIQUE_CHECK | HA_NOSAME))
break;
}
if (ind) {
len = 0;
data = field_ref->fr_cond;
for (i=0; i<ind->mi_seg_count; i++) {
if (i > 0) {
xt_strcat(PBMS_FIELD_COND_SIZE, data, "&");
len = strlen(data);
}
myxt_get_column_name(ot, ind->mi_seg[i].col_idx, PBMS_FIELD_COND_SIZE - len, data + len);
len = strlen(data);
xt_strcat(PBMS_FIELD_COND_SIZE, data, "=");
len = strlen(data);
myxt_get_column_as_string(ot, (char *) ot->ot_row_wbuffer, ind->mi_seg[i].col_idx, PBMS_FIELD_COND_SIZE - len, data + len);
len = strlen(data);
}
}
else
xt_strcpy(PBMS_FIELD_COND_SIZE, field_ref->fr_cond, "*no unique key*");
exit:
return err;
}
PBMSEngineRec pbxt_engine = {
MS_ENGINE_VERSION,
0,
FALSE,
"PBXT",
NULL,
pbxt_close_conn,
pbxt_open_table,
pbxt_close_table,
pbxt_lock_table,
pbxt_unlock_table,
pbxt_send_blob,
pbxt_lookup_ref
};
/* ----------------------------------------------------------------------
* CALL IN FUNCTIONS
*/
xtPublic void xt_pbms_close_all_tables(const char *table_url)
{
pbxt_streaming.closeAllTables(table_url);
}
xtPublic xtBool xt_pbms_close_connection(void *thd, XTExceptionPtr e)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.closeConn(thd, &result);
if (err) {
str_result_to_exception(e, err, &result);
return FAILED;
}
return OK;
}
xtPublic xtBool xt_pbms_open_table(void **open_table, char *table_path)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.openTable(open_table, table_path, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
return FAILED;
}
return OK;
}
xtPublic void xt_pbms_close_table(void *open_table)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.closeTable(open_table, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
xt_log_exception(thread, &thread->t_exception, XT_LOG_DEFAULT);
}
}
xtPublic xtBool xt_pbms_use_blob(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.useBlob(open_table, ret_blob_url, blob_url, col_index, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
return FAILED;
}
return OK;
}
xtPublic xtBool xt_pbms_retain_blobs(void *open_table, PBMSEngineRefPtr eng_ref)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.retainBlobs(open_table, eng_ref, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
return FAILED;
}
return OK;
}
xtPublic void xt_pbms_release_blob(void *open_table, char *blob_url, unsigned short col_index, PBMSEngineRefPtr eng_ref)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.releaseBlob(open_table, blob_url, col_index, eng_ref, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
xt_log_exception(thread, &thread->t_exception, XT_LOG_DEFAULT);
}
}
xtPublic void xt_pbms_drop_table(const char *table_path)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.dropTable(table_path, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
xt_log_exception(thread, &thread->t_exception, XT_LOG_DEFAULT);
}
}
xtPublic void xt_pbms_rename_table(const char *from_table, const char *to_table)
{
PBMSResultRec result;
int err;
err = pbxt_streaming.renameTable(from_table, to_table, &result);
if (err) {
XTThreadPtr thread = xt_get_self();
str_result_to_exception(&thread->t_exception, err, &result);
xt_log_exception(thread, &thread->t_exception, XT_LOG_DEFAULT);
}
}
#endif // XT_STREAMING
/* Copyright (c) 2005 PrimeBase Technologies GmbH
*
* PrimeBase XT
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* 2006-06-07 Paul McCullagh
*
* H&G2JCtL
*
* This file contains PBXT streaming interface.
*/
#ifndef __streaming_xt_h__
#define __streaming_xt_h__
#include "xt_defs.h"
#define PBMS_API pbms_api_PBXT
#include "pbms.h"
xtBool xt_init_streaming(void);
void xt_exit_streaming(void);
void xt_pbms_close_all_tables(const char *table_url);
xtBool xt_pbms_close_connection(void *thd, XTExceptionPtr e);
xtBool xt_pbms_open_table(void **open_table, char *table_path);
void xt_pbms_close_table(void *open_table);
xtBool xt_pbms_use_blob(void *open_table, char **ret_blob_url, char *blob_url, unsigned short col_index);
xtBool xt_pbms_retain_blobs(void *open_table, PBMSEngineRefPtr eng_ref);
void xt_pbms_release_blob(void *open_table, char *blob_url, unsigned short col_index, PBMSEngineRefPtr eng_ref);
void xt_pbms_drop_table(const char *table_path);
void xt_pbms_rename_table(const char *from_table, const char *to_table);
#endif
......@@ -368,7 +368,7 @@ xtPublic void xt_int8_to_byte_size(xtInt8 value, char *string)
/* Version number must also be set in configure.in! */
xtPublic c_char *xt_get_version(void)
{
return "1.0.08 RC";
return "1.0.08c RC";
}
/* Copy and URL decode! */
......
......@@ -47,9 +47,6 @@
#include "myxt_xt.h"
#include "cache_xt.h"
#include "trace_xt.h"
#ifdef XT_STREAMING
#include "streaming_xt.h"
#endif
#include "index_xt.h"
#include "restart_xt.h"
#include "systab_xt.h"
......@@ -410,9 +407,11 @@ xtPublic void xt_tab_init_db(XTThreadPtr self, XTDatabaseHPtr db)
{
XTTableDescRec desc;
XTTableEntryRec te_tab;
XTTableEntryPtr te_ptr;
XTTablePathPtr db_path;
char pbuf[PATH_MAX];
char pbuf[PATH_MAX];
int len;
u_int edx;
enter_();
pushr_(xt_tab_exit_db, db);
......@@ -487,13 +486,29 @@ xtPublic void xt_tab_init_db(XTThreadPtr self, XTDatabaseHPtr db)
desc.td_tab_path->tp_tab_count++;
te_tab.te_table = NULL;
xt_sl_insert(self, db->db_table_by_id, &desc.td_tab_id, &te_tab);
}
freer_(); // xt_describe_tables_exit(&desc)
xt_strcpy(PATH_MAX, pbuf, desc.td_tab_path->tp_path);
/*
* The purpose of this code is to ensure that all tables are opened and cached,
* which is actually only required if tables have foreign key references.
*
* In other words, a side affect of this code is that FK references between tables
* are registered, and checked.
*
* Unfortunately we don't know if a table is referenced by a FK, so we have to open
* all tables.
*
* Cannot open tables in the loop above because db->db_table_by_id which is built
* above is used by xt_use_table_no_lock()
*/
xt_enum_tables_init(&edx);
while ((te_ptr = xt_enum_tables_next(self, db, &edx))) {
xt_strcpy(PATH_MAX, pbuf, te_ptr->te_tab_path->tp_path);
xt_add_dir_char(PATH_MAX, pbuf);
xt_strcat(PATH_MAX, pbuf, desc.td_tab_name);
xt_strcat(PATH_MAX, pbuf, te_ptr->te_tab_name);
xt_heap_release(self, xt_use_table_no_lock(self, db, (XTPathStrPtr)pbuf, FALSE, FALSE, NULL, NULL));
}
freer_(); // xt_describe_tables_exit(&desc)
popr_(); // Discard xt_tab_exit_db(db)
exit_();
......@@ -733,6 +748,12 @@ static xtBool tab_find_table(XTThreadPtr self, XTDatabaseHPtr db, XTPathStrPtr n
return FALSE;
}
xtPublic void xt_tab_disable_index(XTTableHPtr tab, u_int ind_error)
{
tab->tab_dic.dic_disable_index = ind_error;
xt_tab_set_table_repair_pending(tab);
}
xtPublic void xt_tab_set_index_error(XTTableHPtr tab)
{
switch (tab->tab_dic.dic_disable_index) {
......@@ -830,18 +851,18 @@ static void tab_load_index_header(XTThreadPtr self, XTTableHPtr tab, XTOpenFileP
break;
default:
if (tab->tab_dic.dic_index_ver < XT_IND_CURRENT_VERSION)
tab->tab_dic.dic_disable_index = XT_INDEX_TOO_OLD;
xt_tab_disable_index(tab, XT_INDEX_TOO_OLD);
else
tab->tab_dic.dic_disable_index = XT_INDEX_TOO_NEW;
xt_tab_disable_index(tab, XT_INDEX_TOO_NEW);
break;
}
}
else if (tab->tab_index_page_size != XT_INDEX_PAGE_SIZE)
tab->tab_dic.dic_disable_index = XT_INDEX_BAD_BLOCK;
xt_tab_disable_index(tab, XT_INDEX_BAD_BLOCK);
}
else {
memset(tab->tab_index_head, 0, XT_INDEX_HEAD_SIZE);
tab->tab_dic.dic_disable_index = XT_INDEX_MISSING;
xt_tab_disable_index(tab, XT_INDEX_MISSING);
tab->tab_index_header_size = XT_INDEX_HEAD_SIZE;
tab->tab_index_page_size = XT_INDEX_PAGE_SIZE;
tab->tab_dic.dic_index_ver = 0;
......@@ -1112,6 +1133,8 @@ static int tab_new_handle(XTThreadPtr self, XTTableHPtr *r_tab, XTDatabaseHPtr d
xt_heap_set_release_callback(self, tab, tab_onrelease);
tab->tab_repair_pending = xt_tab_is_table_repair_pending(tab);
popr_(); // Discard xt_heap_release(tab)
xt_ht_put(self, db->db_tables, tab);
......@@ -1239,11 +1262,6 @@ static XTOpenTablePoolPtr tab_lock_table(XTThreadPtr self, XTPathStrPtr name, xt
return_(NULL);
}
#ifdef XT_STREAMING
/* Tell PBMS to close all open tables of this sort: */
xt_pbms_close_all_tables(name->ps_path);
#endif
/* Wait for all open tables to close: */
xt_db_wait_for_open_tables(self, table_pool);
......@@ -1320,9 +1338,6 @@ xtPublic void xt_create_table(XTThreadPtr self, XTPathStrPtr name, XTDictionaryP
/* Remove the PBMS table: */
ASSERT(xt_get_self() == self);
#ifdef XT_STREAMING
xt_pbms_drop_table(name->ps_path);
#endif
/* Remove the table from the directory. It will get a new
* ID so the handle in the directory will no longer be valid.
......@@ -1645,9 +1660,6 @@ xtPublic void xt_drop_table(XTThreadPtr self, XTPathStrPtr tab_name, xtBool drop
tab_delete_table_files(self, tab_name, tab_id);
ASSERT(xt_get_self() == self);
#ifdef XT_STREAMING
xt_pbms_drop_table(tab_name->ps_path);
#endif
if ((te_ptr = (XTTableEntryPtr) xt_sl_find(self, db->db_table_by_id, &tab_id))) {
tab_remove_table_path(self, db, te_ptr->te_tab_path);
xt_sl_delete(self, db->db_table_by_id, &tab_id);
......@@ -1764,6 +1776,7 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot)
u_llong max_comp_rec_len = 0;
size_t rec_size;
size_t row_size;
u_llong ext_data_len = 0;
#if defined(DUMP_CHECK_TABLE) || defined(CHECK_TABLE_STATS)
printf("\nCHECK TABLE: %s\n", tab->tab_name->ps_path);
......@@ -1863,6 +1876,7 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot)
printf("record-X ");
#endif
alloc_rec_count++;
ext_data_len += XT_GET_DISK_4(rec_buf->re_log_dat_siz_4);
row_size = XT_GET_DISK_4(rec_buf->re_log_dat_siz_4) + ot->ot_rec_size - XT_REC_EXT_HEADER_SIZE;
alloc_rec_bytes += row_size;
if (!min_comp_rec_len || row_size < min_comp_rec_len)
......@@ -1918,6 +1932,9 @@ xtPublic void xt_check_table(XTThreadPtr self, XTOpenTablePtr ot)
}
#ifdef CHECK_TABLE_STATS
if (!tab->tab_dic.dic_rec_fixed)
printf("Extendend data length = %llu\n", ext_data_len);
if (alloc_rec_count) {
printf("Minumum comp. rec. len. = %llu\n", (u_llong) min_comp_rec_len);
printf("Average comp. rec. len. = %llu\n", (u_llong) ((double) alloc_rec_bytes / (double) alloc_rec_count + (double) 0.5));
......@@ -2086,6 +2103,8 @@ xtPublic void xt_rename_table(XTThreadPtr self, XTPathStrPtr old_name, XTPathStr
popr_(); // Discard xt_free(te_new_name);
tab = xt_use_table_no_lock(self, db, new_name, FALSE, FALSE, &dic, NULL);
/* All renamed tables are considered repaired! */
xt_tab_table_repaired(tab);
xt_heap_release(self, tab);
freer_(); // myxt_free_dictionary(&dic)
......@@ -3708,49 +3727,6 @@ xtPublic int xt_tab_remove_record(XTOpenTablePtr ot, xtRecordID rec_id, xtWord1
}
}
#ifdef XT_STREAMING
if (tab->tab_dic.dic_blob_count) {
/* If the record contains any LONGBLOB then check how much
* space we need.
*/
size_t blob_size;
switch (old_rec_type) {
case XT_TAB_STATUS_DELETE:
case XT_TAB_STATUS_DEL_CLEAN:
break;
case XT_TAB_STATUS_FIXED:
case XT_TAB_STATUS_FIX_CLEAN:
/* Should not be the case, record with LONGBLOB can never be fixed! */
break;
case XT_TAB_STATUS_VARIABLE:
case XT_TAB_STATUS_VAR_CLEAN:
cols_req = tab->tab_dic.dic_blob_cols_req;
cols_in_buffer = cols_req;
blob_size = myxt_load_row_length(ot, rec_size - XT_REC_FIX_HEADER_SIZE, ot->ot_row_rbuffer + XT_REC_FIX_HEADER_SIZE, &cols_in_buffer);
if (cols_in_buffer < cols_req)
blob_size = tab->tab_dic.dic_rec_size;
else
blob_size += XT_REC_FIX_HEADER_SIZE;
if (blob_size > rec_size)
rec_size = blob_size;
break;
case XT_TAB_STATUS_EXT_DLOG:
case XT_TAB_STATUS_EXT_CLEAN:
cols_req = tab->tab_dic.dic_blob_cols_req;
cols_in_buffer = cols_req;
blob_size = myxt_load_row_length(ot, rec_size - XT_REC_EXT_HEADER_SIZE, ot->ot_row_rbuffer + XT_REC_EXT_HEADER_SIZE, &cols_in_buffer);
if (cols_in_buffer < cols_req)
blob_size = tab->tab_dic.dic_rec_size;
else
blob_size += XT_REC_EXT_HEADER_SIZE;
if (blob_size > rec_size)
rec_size = blob_size;
break;
}
}
#endif
set_removed:
if (XT_REC_IS_EXT_DLOG(old_rec_type)) {
/* {LOCK-EXT-REC} Lock, and read again to make sure that the
......@@ -4387,15 +4363,6 @@ xtPublic xtBool xt_tab_new_record(XTOpenTablePtr ot, xtWord1 *rec_buf)
xtRowID row_id;
u_int idx_cnt = 0;
XTIndexPtr *ind;
#ifdef XT_STREAMING
void *pbms_table;
/* PBMS: Reference BLOBs!? */
if (tab->tab_dic.dic_blob_count) {
if (!myxt_use_blobs(ot, &pbms_table, rec_buf))
return FAILED;
}
#endif
if (!myxt_store_row(ot, &rec_info, (char *) rec_buf))
goto failed_0;
......@@ -4430,17 +4397,6 @@ xtPublic xtBool xt_tab_new_record(XTOpenTablePtr ot, xtWord1 *rec_buf)
}
}
#ifdef XT_STREAMING
/* Reference the BLOBs in the row: */
if (tab->tab_dic.dic_blob_count) {
if (!myxt_retain_blobs(ot, pbms_table, rec_info.ri_rec_id)) {
pbms_table = NULL;
goto failed_2;
}
pbms_table = NULL;
}
#endif
/* Do the foreign key stuff: */
if (ot->ot_table->tab_dic.dic_table->dt_fkeys.size() > 0) {
if (!ot->ot_table->tab_dic.dic_table->insertRow(ot, rec_buf))
......@@ -4461,10 +4417,6 @@ xtPublic xtBool xt_tab_new_record(XTOpenTablePtr ot, xtWord1 *rec_buf)
tab_free_row_on_fail(ot, tab, row_id);
failed_0:
#ifdef XT_STREAMING
if (tab->tab_dic.dic_blob_count && pbms_table)
myxt_unuse_blobs(ot, pbms_table);
#endif
return FAILED;
}
......@@ -4568,15 +4520,6 @@ static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWor
xtLogOffset log_offset;
xtBool prev_ext_rec;
#ifdef XT_STREAMING
void *pbms_table;
if (tab->tab_dic.dic_blob_count) {
if (!myxt_use_blobs(ot, &pbms_table, after_buf))
return FAILED;
}
#endif
if (!myxt_store_row(ot, &rec_info, (char *) after_buf))
goto failed_0;
......@@ -4640,16 +4583,6 @@ static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWor
if (prev_ext_rec)
tab_free_ext_record_on_fail(ot, rec_id, &prev_rec_head, TRUE);
#ifdef XT_STREAMING
if (tab->tab_dic.dic_blob_count) {
/* Retain the BLOBs new record: */
if (!myxt_retain_blobs(ot, pbms_table, rec_id))
return FAILED;
/* Release the BLOBs in the old record: */
myxt_release_blobs(ot, before_buf, rec_id);
}
#endif
return OK;
failed_2:
......@@ -4692,11 +4625,6 @@ static xtBool tab_overwrite_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtWor
tab_free_ext_record_on_fail(ot, rec_id, &prev_rec_head, TRUE);
failed_0:
#ifdef XT_STREAMING
/* Unuse the BLOBs of the new record: */
if (tab->tab_dic.dic_blob_count && pbms_table)
myxt_unuse_blobs(ot, pbms_table);
#endif
return FAILED;
}
......@@ -4710,10 +4638,6 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW
u_int idx_cnt = 0;
XTIndexPtr *ind;
#ifdef XT_STREAMING
void *pbms_table;
#endif
/*
* Originally only the flag ot->ot_curr_updated was checked, and if it was on, then
* tab_overwrite_record() was called, but this caused crashes in some cases like:
......@@ -4753,14 +4677,6 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW
row_id = ot->ot_curr_row_id;
self = ot->ot_thread;
#ifdef XT_STREAMING
/* PBMS: Reference BLOBs!? */
if (tab->tab_dic.dic_blob_count) {
if (!myxt_use_blobs(ot, &pbms_table, after_buf))
return FAILED;
}
#endif
if (!myxt_store_row(ot, &rec_info, (char *) after_buf))
goto failed_0;
......@@ -4810,17 +4726,6 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW
}
}
#ifdef XT_STREAMING
/* Reference the BLOBs in the row: */
if (tab->tab_dic.dic_blob_count) {
if (!myxt_retain_blobs(ot, pbms_table, rec_info.ri_rec_id)) {
pbms_table = NULL;
goto failed_2;
}
pbms_table = NULL;
}
#endif
if (ot->ot_table->tab_dic.dic_table->dt_trefs || ot->ot_table->tab_dic.dic_table->dt_fkeys.size() > 0) {
if (!ot->ot_table->tab_dic.dic_table->updateRow(ot, before_buf, after_buf))
goto failed_2;
......@@ -4837,10 +4742,6 @@ xtPublic xtBool xt_tab_update_record(XTOpenTablePtr ot, xtWord1 *before_buf, xtW
XT_TAB_ROW_UNLOCK(&tab->tab_row_rwlock[row_id % XT_ROW_RWLOCKS], ot->ot_thread);
failed_0:
#ifdef XT_STREAMING
if (tab->tab_dic.dic_blob_count && pbms_table)
myxt_unuse_blobs(ot, pbms_table);
#endif
return FAILED;
}
......@@ -5166,3 +5067,166 @@ xtPublic xtBool xt_tab_seq_next(XTOpenTablePtr ot, xtWord1 *buffer, xtBool *eof)
return FAILED;
}
/*
* -----------------------------------------------------------------------
* REPAIR TABLE
*/
#define REP_FIND 0
#define REP_ADD 1
#define REP_DEL 2
static xtBool tab_exec_repair_pending(XTDatabaseHPtr db, int what, char *table_name)
{
XTThreadPtr thread = xt_get_self();
char file_path[PATH_MAX];
XTOpenFilePtr of = NULL;
int len;
char *buffer = NULL, *ptr, *name;
char ch;
xtBool found = FALSE;
xt_strcpy(PATH_MAX, file_path, db->db_main_path);
xt_add_pbxt_file(PATH_MAX, file_path, "repair-pending");
if (what == REP_ADD) {
if (!xt_open_file_ns(&of, file_path, XT_FS_CREATE | XT_FS_MAKE_PATH))
return FALSE;
}
else {
if (!xt_open_file_ns(&of, file_path, XT_FS_DEFAULT))
return FALSE;
}
if (!of)
return FALSE;
len = (int) xt_seek_eof_file(NULL, of);
if (!(buffer = (char *) xt_malloc_ns(len + 1)))
goto failed;
if (!xt_pread_file(of, 0, len, len, buffer, NULL, &thread->st_statistics.st_x, thread))
goto failed;
buffer[len] = 0;
ptr = buffer;
for(;;) {
name = ptr;
while (*ptr && *ptr != '\n' && *ptr != '\r')
ptr++;
if (ptr > name) {
ch = *ptr;
*ptr = 0;
if (xt_tab_compare_names(name, table_name) == 0) {
*ptr = ch;
found = TRUE;
break;
}
*ptr = ch;
}
if (!*ptr)
break;
ptr++;
}
switch (what) {
case REP_ADD:
if (!found) {
/* Remove any trailing empty lines: */
while (len > 0) {
if (buffer[len-1] != '\n' && buffer[len-1] != '\r')
break;
len--;
}
if (len > 0) {
if (!xt_pwrite_file(of, len, 1, (void *) "\n", &thread->st_statistics.st_x, thread))
goto failed;
len++;
}
if (!xt_pwrite_file(of, len, strlen(table_name), table_name, &thread->st_statistics.st_x, thread))
goto failed;
len += strlen(table_name);
if (!xt_set_eof_file(NULL, of, len))
goto failed;
}
break;
case REP_DEL:
if (found) {
if (*ptr != '\0')
ptr++;
memmove(name, ptr, len - (ptr - buffer));
len = len - (ptr - name);
/* Remove trailing empty lines: */
while (len > 0) {
if (buffer[len-1] != '\n' && buffer[len-1] != '\r')
break;
len--;
}
if (len > 0) {
if (!xt_pwrite_file(of, 0, len, buffer, &thread->st_statistics.st_x, thread))
goto failed;
if (!xt_set_eof_file(NULL, of, len))
goto failed;
}
}
break;
}
xt_close_file_ns(of);
xt_free_ns(buffer);
if (len == 0)
xt_fs_delete(NULL, file_path);
return found;
failed:
if (of)
xt_close_file_ns(of);
if (buffer)
xt_free_ns(buffer);
xt_log_and_clear_exception(thread);
return FALSE;
}
xtPublic void tab_make_table_name(XTTableHPtr tab, char *table_name, size_t size)
{
char name_buf[XT_IDENTIFIER_NAME_SIZE*3+3];
xt_2nd_last_name_of_path(sizeof(name_buf), name_buf, tab->tab_name->ps_path);
myxt_static_convert_file_name(name_buf, table_name, size);
xt_strcat(size, table_name, ".");
myxt_static_convert_file_name(xt_last_name_of_path(tab->tab_name->ps_path), name_buf, sizeof(name_buf));
xt_strcat(size, table_name, name_buf);
}
xtPublic xtBool xt_tab_is_table_repair_pending(XTTableHPtr tab)
{
char table_name[XT_IDENTIFIER_NAME_SIZE*3+3];
tab_make_table_name(tab, table_name, sizeof(table_name));
return tab_exec_repair_pending(tab->tab_db, REP_FIND, table_name);
}
xtPublic void xt_tab_table_repaired(XTTableHPtr tab)
{
if (tab->tab_repair_pending) {
char table_name[XT_IDENTIFIER_NAME_SIZE*3+3];
tab->tab_repair_pending = FALSE;
tab_make_table_name(tab, table_name, sizeof(table_name));
tab_exec_repair_pending(tab->tab_db, REP_DEL, table_name);
}
}
xtPublic void xt_tab_set_table_repair_pending(XTTableHPtr tab)
{
if (!tab->tab_repair_pending) {
char table_name[XT_IDENTIFIER_NAME_SIZE*3+3];
tab->tab_repair_pending = TRUE;
tab_make_table_name(tab, table_name, sizeof(table_name));
tab_exec_repair_pending(tab->tab_db, REP_ADD, table_name);
}
}
......@@ -333,6 +333,7 @@ typedef struct XTTable : public XTHeap {
/* Values that belong in the header when flushed! */
xtBool tab_flush_pending; /* TRUE if the table needs to be flushed */
xtBool tab_recovery_done; /* TRUE if the table has been recovered */
xtBool tab_repair_pending; /* TRUE if the table has been marked for repair */
xtBool tab_temporary; /* TRUE if this is a temporary table {TEMP-TABLES}. */
off_t tab_bytes_to_flush; /* Number of bytes of the record/row files to flush. */
......@@ -562,8 +563,13 @@ xtBool xt_tab_put_eof_rec_data(XTOpenTablePtr ot, xtRecordID rec_id, size_t s
xtBool xt_tab_put_log_op_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer);
xtBool xt_tab_put_log_rec_data(XTOpenTablePtr ot, u_int status, xtRecordID free_rec_id, xtRecordID rec_id, size_t size, xtWord1 *buffer, xtOpSeqNo *op_seq);
xtBool xt_tab_get_rec_data(register XTOpenTablePtr ot, xtRecordID rec_id, size_t size, xtWord1 *buffer);
void xt_tab_disable_index(XTTableHPtr tab, u_int ind_error);
void xt_tab_set_index_error(XTTableHPtr tab);
xtBool xt_tab_is_table_repair_pending(XTTableHPtr tab);
void xt_tab_table_repaired(XTTableHPtr tab);
void xt_tab_set_table_repair_pending(XTTableHPtr tab);
inline off_t xt_row_id_to_row_offset(register XTTableHPtr tab, xtRowID row_id)
{
return (off_t) tab->tab_rows.tci_header_size + (off_t) (row_id - 1) * (off_t) tab->tab_rows.tci_rec_size;
......@@ -631,3 +637,4 @@ inline xtIndexNodeID xt_ind_offset_to_node(register XTTableHPtr tab, off_t ind_o
while (0)
#endif
......@@ -1132,6 +1132,7 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
/* [(8)] Flush the compactor log. */
xt_lock_mutex_ns(&xl_db->db_co_dlog_lock);
if (!xl_db->db_co_thread->st_dlog_buf.dlb_flush_log(TRUE, thread)) {
xl_log_bytes_written -= part_size;
xt_unlock_mutex_ns(&xl_db->db_co_dlog_lock);
goto write_failed;
}
......@@ -1140,8 +1141,10 @@ xtBool XTDatabaseLog::xlog_append(XTThreadPtr thread, size_t size1, xtWord1 *dat
/* And flush if required: */
flush_time = thread->st_statistics.st_xlog.ts_flush_time;
if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread))
if (!xt_flush_file(xl_log_file, &thread->st_statistics.st_xlog, thread)) {
xl_log_bytes_written -= part_size;
goto write_failed;
}
xl_last_flush_time = (u_int) (thread->st_statistics.st_xlog.ts_flush_time - flush_time);
xl_log_bytes_flushed = xl_log_bytes_written;
......@@ -2514,9 +2517,6 @@ static void xlog_wr_main(XTThreadPtr self)
if (!record) {
break;
}
/* Count the number of bytes read from the log: */
db->db_xlog.xl_log_bytes_read += ws->ws_seqread.xseq_record_len;
switch (record->xl.xl_status_1) {
case XT_LOG_ENT_HEADER:
break;
......@@ -2540,6 +2540,8 @@ static void xlog_wr_main(XTThreadPtr self)
xt_xres_apply_in_order(self, ws, ws->ws_seqread.xseq_rec_log_id, ws->ws_seqread.xseq_rec_log_offset, record);
break;
}
/* Count the number of bytes read from the log: */
db->db_xlog.xl_log_bytes_read += ws->ws_seqread.xseq_record_len;
}
}
......
......@@ -81,7 +81,7 @@ const int max_connections = 500;
#define DEBUG
#endif // _DEBUG
#else
#define XT_STREAMING
#define PBMS_ENABLED
#endif
#ifdef __FreeBSD__
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment