Commit 5ba094f8 authored by claes's avatar claes

Replaces old not yet sent messages with new one if one arrives

parent c7965a28
/*
* Proview $Id: rt_qmon.c,v 1.8 2006-02-14 05:28:09 claes Exp $
* Proview $Id: rt_qmon.c,v 1.9 2006-03-20 06:47:42 claes Exp $
* Copyright (C) 2005 SSAB Oxelsund AB.
*
* This program is free software; you can redistribute it and/or
......@@ -135,6 +135,7 @@ struct sEseg {
sLink *lp;
lst_sEntry le_bcast;
lst_sEntry le_seg;
unsigned int id;
qdb_sBuffer *bp;
char *p;
int bytes;
......@@ -281,6 +282,7 @@ struct {
static pwr_tStatus qcom_sts = PWR__SRVSTARTUP;
static void cancel_links();
static pwr_tBoolean clean_insert (lst_sEntry*, sEseg*, pwr_tBoolean);
static sEseg* create_connect (sLink*);
static void create_links();
static void decode_head(sHead*, sHead*);
......@@ -441,6 +443,86 @@ cancel_links ()
for (lp = tree_Minimum(&sts, l.links.table); lp != NULL; lp = tree_Successor(&sts, l.links.table, lp))
thread_Cancel(&lp->thread);
}
/* Free unsent segments from que that has not been
sent which will be replaced by new segment */
static pwr_tBoolean
clean_insert (
lst_sEntry *le,
sEseg *esp,
pwr_tBoolean pending
)
{
lst_sEntry *se, *li;
sEseg *sp, *nsp;
pwr_tBoolean ret_pend = FALSE;
pwr_tBoolean first = FALSE;
li = le;
if (esp->bp != NULL) {
if (esp->bp->b.msg_id > 0) {
if ((!pending) && (esp->head.flags.b.first)) {
for (sp = lst_Succ(NULL, le, &se); se != le; sp = nsp) {
li = se;
nsp = lst_Succ(NULL, se, &se);
if (sp->bp != NULL) {
if (sp->bp->b.msg_id == esp->bp->b.msg_id) {
if (sp->head.flags.b.first)
first = TRUE;
if (first) {
lst_Remove(NULL, &sp->c.le);
eseg_free(sp);
li = se;
} else {
break;
}
} else if (first) {
break;
}
} else if (first) {
break;
}
}
if (!esp->head.flags.b.last && first) ret_pend = TRUE;
if (!first) li = le;
}
else if (pending) {
for (sp = lst_Succ(NULL, le, &se); se != le; sp = nsp) {
li = se;
nsp = lst_Succ(NULL, se, &se);
if (sp->bp != NULL) {
if ((sp->bp->b.msg_id == esp->bp->b.msg_id) &&
(sp->bp == esp->bp)) {
first = TRUE;
li = se;
} else if (first) {
break;
}
} else if (first) {
break;
}
}
if (!first) li = le;
ret_pend = !esp->head.flags.b.last;
}
}
}
/* Insert new item */
lst_InsertPred(NULL, li, &esp->c.le, esp);
return ret_pend;
}
static sEseg *
......@@ -597,6 +679,7 @@ eseg_build (
sEseg *mcsp;
sLink *lp;
char *p;
int ii;
pwr_tBoolean bcast = bp->c.flags.b.broadcast;
thread_MutexLock(&l.eseg.mutex);
......@@ -620,14 +703,15 @@ eseg_build (
break;
for (
msp = NULL, size = bp->b.info.size + sizeof(bp->b.info), p = (char *)&bp->b.info;
msp = NULL, size = bp->b.info.size + sizeof(bp->b.info), ii = 0, p = (char *)&bp->b.info;
size > 0;
p += MAX_SEGSIZE, size -= MAX_SEGSIZE
ii++, p += MAX_SEGSIZE, size -= MAX_SEGSIZE
) {
sp = eseg_alloc(NULL);
sp->p = p;
sp->bp = bp;
sp->lp = lp;
sp->id = ii;
sp->size = MIN(size, MAX_SEGSIZE);
sp->head.flags.b.event = eEvent_user;
sp->head.flags.b.bcast = bcast;
......@@ -653,6 +737,7 @@ eseg_build (
) {
mcsp = NULL;
sp = msp;
ii = 0;
do {
csp = eseg_alloc(NULL);
csp->p = sp->p;
......@@ -660,6 +745,7 @@ eseg_build (
csp->lp = lp;
csp->head.flags.m = sp->head.flags.m;
csp->size = sp->size;
csp->id = ii++;
if (mcsp == NULL) {
mcsp = csp;
/* todo initiera msp->le_seg.item = msp */
......@@ -779,16 +865,16 @@ export_thread ()
thread_MutexLock(&l.bcast);
esp = sp;
do {
ssp = esp;
esp = sp;
do {
ssp->c.action = eAction_export;
que_Put(NULL, &ssp->lp->q_in, &ssp->c.le, ssp);
ssp = lst_Succ(NULL, &ssp->le_seg, NULL) ;
} while (ssp != esp);
esp = lst_Succ(NULL, &ssp->le_bcast, NULL) ;
} while (esp != sp);
ssp = esp;
do {
ssp->c.action = eAction_export;
que_Put(NULL, &ssp->lp->q_in, &ssp->c.le, ssp);
ssp = lst_Succ(NULL, &ssp->le_seg, NULL) ;
} while (ssp != esp);
esp = lst_Succ(NULL, &ssp->le_bcast, NULL) ;
} while (esp != sp);
thread_MutexUnlock(&l.bcast);
}
......@@ -1298,17 +1384,21 @@ link_thread (
)
{
uSeg *sp;
pwr_tBoolean pending = FALSE;
for ( ; ; ) {
sp = que_Get(NULL, &lp->q_in, link_tmo(lp), &lp->tmo);
switch (sp->action) {
case eAction_export:
lst_InsertPred(NULL, &lp->lh_send, &sp->c.le, sp);
break;
/* lst_InsertPred(NULL, &lp->lh_send, &sp->c.le, sp); */
pending = clean_insert(&lp->lh_send, (sEseg *) sp, pending);
if (pending) continue;
else break;
case eAction_import:
link_import(lp, &sp->i);
break;
if (pending) continue;
else break;
case eAction_tmo:
/* Nothing to do, all is done in link_send. */
break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment