Commit 6a70e366 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

Use mutex to manipulate queues

parent 4e0e3fa2
......@@ -37,7 +37,9 @@ typedef struct {
UA_Float speed;
UA_Float climbRate;
StrQueue receiveMessageQueue;
pthread_mutex_t messageMutex;
StrQueue receiveLogQueue;
pthread_mutex_t logMutex;
} JSDroneData;
typedef struct {
......
......@@ -107,15 +107,18 @@ static UA_Boolean pubsubExited = true;
static UA_UInt32 nbDrone;
static UA_UInt32 nbSubscriber;
static JSValueConst *droneObjectIdList;
static StrQueue sendMessageQueue = {
StrQueue sendMessageQueue = {
.head = NULL,
.tail = NULL,
};
pthread_mutex_t messageMutex;
UA_String currentSendMessage;
static StrQueue sendLogQueue = {
StrQueue sendLogQueue = {
.head = NULL,
.tail = NULL,
};
pthread_mutex_t logMutex;
UA_String currentSendLog;
bool isADrone;
......@@ -257,19 +260,22 @@ static JSValue js_drone_init(JSContext *ctx, JSValueConst thisVal,
{
int nb;
JSDroneData *s = (JSDroneData *) JS_GetOpaque2(ctx, thisVal, jsDroneClassId);
if (!s)
return JS_EXCEPTION;
if (JS_ToInt32(ctx, &nb, argv[0]))
return JS_EXCEPTION;
droneObjectIdList[nb] = thisVal;
return JS_UNDEFINED;
}
static JSValue readDroneDataStr(JSContext *ctx, StrQueue *pQueue, bool keepAtLeastAnElement)
static JSValue getStrFromQueue(JSContext *ctx, StrQueue *pQueue, pthread_mutex_t *mutex)
{
JSValue res;
struct strNode *current;
pthread_mutex_lock(mutex);
current = pQueue->head;
if (current != NULL) {
res = JS_NewString(ctx, current->str);
......@@ -277,14 +283,13 @@ static JSValue readDroneDataStr(JSContext *ctx, StrQueue *pQueue, bool keepAtLea
pQueue->head = current->next;
delete_str_node(current);
} else {
if (!keepAtLeastAnElement) {
pQueue->head = (pQueue->tail = NULL);
delete_str_node(current);
}
}
} else {
res = JS_NewString(ctx, "");
}
pthread_mutex_unlock(mutex);
return res;
}
......@@ -311,9 +316,9 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic)
case 7:
return JS_NewFloat64(ctx, s->climbRate);
case 8:
return readDroneDataStr(ctx, &(s->receiveMessageQueue), true);
return getStrFromQueue(ctx, &(s->receiveMessageQueue), &(s->messageMutex));
case 9:
return readDroneDataStr(ctx, &(s->receiveLogQueue), false);
return getStrFromQueue(ctx, &(s->receiveLogQueue), &(s->logMutex));
case 10:
return JS_NewInt64(ctx, s->timestamp);
default:
......@@ -321,32 +326,35 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic)
}
}
static void addStrToQueue(const char *str, StrQueue *pQueue)
static void addStrToQueue(const char *str, StrQueue *pQueue, pthread_mutex_t *mutex)
{
struct strNode *newNode;
newNode = (struct strNode*)malloc(sizeof(struct strNode));
newNode->str = strdup(str);
newNode->next = NULL;
pthread_mutex_lock(mutex);
if (pQueue->tail == NULL) {
pQueue->head = pQueue->tail = newNode;
} else {
pQueue->tail->next = newNode;
pQueue->tail = newNode;
}
pthread_mutex_unlock(mutex);
}
static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr,
StrQueue *pQueue)
StrQueue *pQueue, pthread_mutex_t *mutex)
{
const char *str;
str = JS_ToCString(ctx, jsStr);
if (strlen(str) > MAX_STR_SIZE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "String too long");
JS_FreeCString(ctx, str);
return JS_EXCEPTION;
}
addStrToQueue(str, pQueue);
addStrToQueue(str, pQueue, mutex);
JS_FreeCString(ctx, str);
return JS_UNDEFINED;
}
......@@ -354,13 +362,13 @@ static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr,
static JSValue js_drone_set_message(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv)
{
return add_jsstr_to_queue(ctx, argv[0], &sendMessageQueue);
return add_jsstr_to_queue(ctx, argv[0], &sendMessageQueue, &messageMutex);
}
static JSValue js_drone_set_log(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv)
{
return add_jsstr_to_queue(ctx, argv[0], &sendLogQueue);
return add_jsstr_to_queue(ctx, argv[0], &sendLogQueue, &logMutex);
}
static UA_Boolean UA_String_isEmpty(const UA_String *s) {
......@@ -372,28 +380,32 @@ static void clear_str(UA_String *pstr) {
UA_String_clear(pstr);
}
static UA_String get_StrQueue_content(StrQueue *pQueue, UA_String currentStr)
static UA_String get_ua_str_from_queue(StrQueue *pQueue, pthread_mutex_t *mutex, UA_String currentStr)
{
struct strNode *current;
clear_str(&currentStr);
current = pQueue->head;
pthread_mutex_lock(mutex);
if (current != NULL) {
clear_str(&currentStr);
currentStr = UA_STRING_ALLOC(current->str);
pQueue->head = current->next == NULL ? (pQueue->tail = NULL) : current->next;
delete_str_node(current);
} else {
currentStr = UA_STRING("");
}
pthread_mutex_unlock(mutex);
return currentStr;
}
UA_String get_message(void)
{
return get_StrQueue_content(&sendMessageQueue, currentSendMessage);
return get_ua_str_from_queue(&sendMessageQueue, &messageMutex, currentSendMessage);
}
UA_String get_log(void)
{
return get_StrQueue_content(&sendLogQueue, currentSendLog);
return get_ua_str_from_queue(&sendLogQueue, &logMutex, currentSendLog);
}
static JSClassDef jsDroneClass = {
......@@ -482,16 +494,17 @@ VariableData pubsub_get_value(UA_String identifier) {
return varDetails;
}
static void setDroneDataStr(void *data, StrQueue* pQueue)
static void setDroneDataStr(void *data, StrQueue* pQueue, pthread_mutex_t *mutex)
{
UA_String uaStr = *(UA_String*) data;
char str[MAX_STR_SIZE];
memcpy(str, uaStr.data, uaStr.length);
str[uaStr.length] = '\0';
addStrToQueue(str, pQueue);
addStrToQueue(str, pQueue, mutex);
}
static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool print)
static void pubsub_update_variables(UA_UInt32 id,
const UA_DataValue *var, bool print)
{
JSDroneData* s;
UA_Int64* updatedPositionArray;
......@@ -526,11 +539,11 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool
}
return;
} else if (pubsubIdsArray[i].message == id) {
setDroneDataStr(var->value.data, &(s->receiveMessageQueue));
setDroneDataStr(var->value.data, &(s->receiveMessageQueue), &(s->messageMutex));
return;
} else if (pubsubIdsArray[i].log == id) {
if (!isADrone) {
setDroneDataStr(var->value.data, &(s->receiveLogQueue));
setDroneDataStr(var->value.data, &(s->receiveLogQueue), &(s->logMutex));
}
return;
}
......@@ -641,7 +654,6 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal,
while(!pubsubExited)
sleep(1);
free(droneObjectIdList);
cleanQueue(&sendMessageQueue);
clear_str(&currentSendMessage);
cleanQueue(&sendLogQueue);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment