Commit 38c9977c authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

Use pipes to store pubsub messages

parent 4e0e3fa2
...@@ -17,15 +17,6 @@ ...@@ -17,15 +17,6 @@
#define MAX_STR_SIZE 1024 #define MAX_STR_SIZE 1024
struct strNode {
char *str;
struct strNode *next;
};
typedef struct {
struct strNode *head;
struct strNode *tail;
} StrQueue;
typedef struct { typedef struct {
UA_UInt16 id; UA_UInt16 id;
UA_Double latitude; UA_Double latitude;
...@@ -36,8 +27,8 @@ typedef struct { ...@@ -36,8 +27,8 @@ typedef struct {
UA_Float yaw; UA_Float yaw;
UA_Float speed; UA_Float speed;
UA_Float climbRate; UA_Float climbRate;
StrQueue receiveMessageQueue; int receiveMessagePipefds[2];
StrQueue receiveLogQueue; int receiveLogPipefds[2];
} JSDroneData; } JSDroneData;
typedef struct { typedef struct {
......
...@@ -107,16 +107,11 @@ static UA_Boolean pubsubExited = true; ...@@ -107,16 +107,11 @@ static UA_Boolean pubsubExited = true;
static UA_UInt32 nbDrone; static UA_UInt32 nbDrone;
static UA_UInt32 nbSubscriber; static UA_UInt32 nbSubscriber;
static JSValueConst *droneObjectIdList; static JSValueConst *droneObjectIdList;
static StrQueue sendMessageQueue = {
.head = NULL,
.tail = NULL,
};
UA_String currentSendMessage; UA_String currentSendMessage;
static StrQueue sendLogQueue = { int sendMessagePipefds[2];
.head = NULL,
.tail = NULL,
};
UA_String currentSendLog; UA_String currentSendLog;
int sendLogPipefds[2];
bool isADrone; bool isADrone;
...@@ -190,30 +185,23 @@ static const JSCFunctionListEntry js_position_proto_funcs[] = { ...@@ -190,30 +185,23 @@ static const JSCFunctionListEntry js_position_proto_funcs[] = {
// Drone class functions // Drone class functions
static void delete_str_node(struct strNode *node) {
free(node->str);
free(node);
}
static void cleanQueue(StrQueue *pQueue)
{
struct strNode *current;
while (pQueue->head != NULL) {
current = pQueue->head;
pQueue->head = current->next;
delete_str_node(current);
}
}
static void js_drone_finalizer(JSRuntime *rt, JSValue val) static void js_drone_finalizer(JSRuntime *rt, JSValue val)
{ {
JSDroneData *s = (JSDroneData *) JS_GetOpaque(val, jsDroneClassId); JSDroneData *s = (JSDroneData *) JS_GetOpaque(val, jsDroneClassId);
cleanQueue(&(s->receiveMessageQueue)); close(s->receiveMessagePipefds[0]);
cleanQueue(&(s->receiveLogQueue)); close(s->receiveMessagePipefds[1]);
close(s->receiveLogPipefds[0]);
close(s->receiveLogPipefds[1]);
js_free_rt(rt, s); js_free_rt(rt, s);
} }
static int set_pipe_non_blocking(int rpipefd) {
int res = fcntl(rpipefd, F_SETFL, fcntl(rpipefd, F_GETFL) | O_NONBLOCK);
if (res < 0)
perror("Failed to set pipe non blocking");
return res;
}
static JSValue js_drone_ctor(JSContext *ctx, JSValueConst newTarget, static JSValue js_drone_ctor(JSContext *ctx, JSValueConst newTarget,
int argc, JSValueConst *argv) int argc, JSValueConst *argv)
{ {
...@@ -228,14 +216,22 @@ static JSValue js_drone_ctor(JSContext *ctx, JSValueConst newTarget, ...@@ -228,14 +216,22 @@ static JSValue js_drone_ctor(JSContext *ctx, JSValueConst newTarget,
if (JS_ToUint32(ctx, &uint32, argv[0])) if (JS_ToUint32(ctx, &uint32, argv[0]))
goto fail; goto fail;
s->id = (uint16_t)uint32; s->id = (uint16_t)uint32;
s->receiveMessageQueue = (StrQueue){
.head = NULL, if (pipe(s->receiveMessagePipefds) < 0) {
.tail = NULL, perror("Failed to create receiveMessagePipe");
}; goto fail;
s->receiveLogQueue = (StrQueue){ }
.head = NULL,
.tail = NULL, if (set_pipe_non_blocking(s->receiveMessagePipefds[0]) < 0)
}; goto fail;
if (pipe(s->receiveLogPipefds) < 0) {
perror("Failed to create receiveLogPipe");
goto fail;
}
if (set_pipe_non_blocking(s->receiveLogPipefds[0]) < 0)
goto fail;
proto = JS_GetPropertyStr(ctx, newTarget, "prototype"); proto = JS_GetPropertyStr(ctx, newTarget, "prototype");
if (JS_IsException(proto)) if (JS_IsException(proto))
...@@ -247,6 +243,10 @@ static JSValue js_drone_ctor(JSContext *ctx, JSValueConst newTarget, ...@@ -247,6 +243,10 @@ static JSValue js_drone_ctor(JSContext *ctx, JSValueConst newTarget,
JS_SetOpaque(obj, s); JS_SetOpaque(obj, s);
return obj; return obj;
fail: fail:
close(s->receiveMessagePipefds[0]);
close(s->receiveMessagePipefds[1]);
close(s->receiveLogPipefds[0]);
close(s->receiveLogPipefds[1]);
js_free(ctx, s); js_free(ctx, s);
JS_FreeValue(ctx, obj); JS_FreeValue(ctx, obj);
return JS_EXCEPTION; return JS_EXCEPTION;
...@@ -257,31 +257,32 @@ static JSValue js_drone_init(JSContext *ctx, JSValueConst thisVal, ...@@ -257,31 +257,32 @@ static JSValue js_drone_init(JSContext *ctx, JSValueConst thisVal,
{ {
int nb; int nb;
JSDroneData *s = (JSDroneData *) JS_GetOpaque2(ctx, thisVal, jsDroneClassId); JSDroneData *s = (JSDroneData *) JS_GetOpaque2(ctx, thisVal, jsDroneClassId);
if (!s) if (!s)
return JS_EXCEPTION; return JS_EXCEPTION;
if (JS_ToInt32(ctx, &nb, argv[0])) if (JS_ToInt32(ctx, &nb, argv[0]))
return JS_EXCEPTION; return JS_EXCEPTION;
droneObjectIdList[nb] = thisVal; droneObjectIdList[nb] = thisVal;
return JS_UNDEFINED; return JS_UNDEFINED;
} }
static JSValue readDroneDataStr(JSContext *ctx, StrQueue *pQueue, bool keepAtLeastAnElement) static ssize_t read_from_pipe(int file, char *rd_buf, size_t max_read)
{
ssize_t nread = read(file, rd_buf, max_read);
if (nread == -1 && errno != EAGAIN)
perror("Failed to read pipe");
return nread;
}
static JSValue getStrFromPipe(JSContext *ctx, int rpipefd)
{ {
JSValue res; JSValue res;
struct strNode *current; char rd_buf[MAX_STR_SIZE];
current = pQueue->head; if (read_from_pipe(rpipefd, rd_buf, MAX_STR_SIZE) > 0) {
if (current != NULL) { res = JS_NewString(ctx, rd_buf);
res = JS_NewString(ctx, current->str);
if (current->next != NULL) {
pQueue->head = current->next;
delete_str_node(current);
} else {
if (!keepAtLeastAnElement) {
pQueue->head = (pQueue->tail = NULL);
delete_str_node(current);
}
}
} else { } else {
res = JS_NewString(ctx, ""); res = JS_NewString(ctx, "");
} }
...@@ -311,9 +312,9 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic) ...@@ -311,9 +312,9 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic)
case 7: case 7:
return JS_NewFloat64(ctx, s->climbRate); return JS_NewFloat64(ctx, s->climbRate);
case 8: case 8:
return readDroneDataStr(ctx, &(s->receiveMessageQueue), true); return getStrFromPipe(ctx, s->receiveMessagePipefds[0]);
case 9: case 9:
return readDroneDataStr(ctx, &(s->receiveLogQueue), false); return getStrFromPipe(ctx, s->receiveLogPipefds[0]);
case 10: case 10:
return JS_NewInt64(ctx, s->timestamp); return JS_NewInt64(ctx, s->timestamp);
default: default:
...@@ -321,46 +322,39 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic) ...@@ -321,46 +322,39 @@ static JSValue js_drone_get(JSContext *ctx, JSValueConst thisVal, int magic)
} }
} }
static void addStrToQueue(const char *str, StrQueue *pQueue) static void write_to_pipe(int file, const char *wr_buf, size_t max_written)
{ {
struct strNode *newNode; ssize_t nwritten = write(file, wr_buf, max_written);
newNode = (struct strNode*)malloc(sizeof(struct strNode)); if (nwritten < 0)
newNode->str = strdup(str); perror("Failed to write pipe");
newNode->next = NULL;
if (pQueue->tail == NULL) {
pQueue->head = pQueue->tail = newNode;
} else {
pQueue->tail->next = newNode;
pQueue->tail = newNode;
}
} }
static JSValue add_jsstr_to_queue(JSContext *ctx, JSValueConst jsStr, static JSValue add_jsstr_to_pipe(JSContext *ctx, JSValueConst jsStr,
StrQueue *pQueue) int wpipefd)
{ {
const char *str; const char *wr_buf;
str = JS_ToCString(ctx, jsStr); wr_buf = JS_ToCString(ctx, jsStr);
if (strlen(str) > MAX_STR_SIZE) { if (strlen(wr_buf) > MAX_STR_SIZE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "String too long"); UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "String too long");
JS_FreeCString(ctx, wr_buf);
return JS_EXCEPTION; return JS_EXCEPTION;
} }
addStrToQueue(str, pQueue); write_to_pipe(wpipefd, wr_buf, MAX_STR_SIZE);
JS_FreeCString(ctx, str); JS_FreeCString(ctx, wr_buf);
return JS_UNDEFINED; return JS_UNDEFINED;
} }
static JSValue js_drone_set_message(JSContext *ctx, JSValueConst thisVal, static JSValue js_drone_set_message(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv) int argc, JSValueConst *argv)
{ {
return add_jsstr_to_queue(ctx, argv[0], &sendMessageQueue); return add_jsstr_to_pipe(ctx, argv[0], sendMessagePipefds[1]);
} }
static JSValue js_drone_set_log(JSContext *ctx, JSValueConst thisVal, static JSValue js_drone_set_log(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv) int argc, JSValueConst *argv)
{ {
return add_jsstr_to_queue(ctx, argv[0], &sendLogQueue); return add_jsstr_to_pipe(ctx, argv[0], sendLogPipefds[1]);
} }
static UA_Boolean UA_String_isEmpty(const UA_String *s) { static UA_Boolean UA_String_isEmpty(const UA_String *s) {
...@@ -372,28 +366,23 @@ static void clear_str(UA_String *pstr) { ...@@ -372,28 +366,23 @@ static void clear_str(UA_String *pstr) {
UA_String_clear(pstr); UA_String_clear(pstr);
} }
static UA_String get_StrQueue_content(StrQueue *pQueue, UA_String currentStr) static UA_String get_ua_str_from_pipe(int rpipefd, UA_String currentStr)
{ {
struct strNode *current; char rd_buf[MAX_STR_SIZE];
current = pQueue->head;
if (current != NULL) {
clear_str(&currentStr); clear_str(&currentStr);
currentStr = UA_STRING_ALLOC(current->str); currentStr = (read_from_pipe(rpipefd, rd_buf, MAX_STR_SIZE) > 0) ?
pQueue->head = current->next == NULL ? (pQueue->tail = NULL) : current->next; UA_STRING_ALLOC(rd_buf) : UA_STRING("");
delete_str_node(current);
}
return currentStr; return currentStr;
} }
UA_String get_message(void) UA_String get_message(void)
{ {
return get_StrQueue_content(&sendMessageQueue, currentSendMessage); return get_ua_str_from_pipe(sendMessagePipefds[0], currentSendMessage);
} }
UA_String get_log(void) UA_String get_log(void)
{ {
return get_StrQueue_content(&sendLogQueue, currentSendLog); return get_ua_str_from_pipe(sendLogPipefds[0], currentSendLog);
} }
static JSClassDef jsDroneClass = { static JSClassDef jsDroneClass = {
...@@ -482,16 +471,18 @@ VariableData pubsub_get_value(UA_String identifier) { ...@@ -482,16 +471,18 @@ VariableData pubsub_get_value(UA_String identifier) {
return varDetails; return varDetails;
} }
static void setDroneDataStr(void *data, StrQueue* pQueue) static void pushToPipe(void *data, int wpipefd)
{ {
UA_String uaStr = *(UA_String*) data; UA_String uaStr = *(UA_String*) data;
char str[MAX_STR_SIZE]; char wr_buf[MAX_STR_SIZE];
memcpy(str, uaStr.data, uaStr.length);
str[uaStr.length] = '\0'; memcpy(wr_buf, uaStr.data, uaStr.length);
addStrToQueue(str, pQueue); wr_buf[uaStr.length] = '\0';
write_to_pipe(wpipefd, wr_buf, MAX_STR_SIZE);
} }
static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool print) static void pubsub_update_variables(UA_UInt32 id,
const UA_DataValue *var, bool print)
{ {
JSDroneData* s; JSDroneData* s;
UA_Int64* updatedPositionArray; UA_Int64* updatedPositionArray;
...@@ -526,11 +517,11 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool ...@@ -526,11 +517,11 @@ static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool
} }
return; return;
} else if (pubsubIdsArray[i].message == id) { } else if (pubsubIdsArray[i].message == id) {
setDroneDataStr(var->value.data, &(s->receiveMessageQueue)); pushToPipe(var->value.data, s->receiveMessagePipefds[1]);
return; return;
} else if (pubsubIdsArray[i].log == id) { } else if (pubsubIdsArray[i].log == id) {
if (!isADrone) { if (!isADrone) {
setDroneDataStr(var->value.data, &(s->receiveLogQueue)); pushToPipe(var->value.data, s->receiveLogPipefds[1]);
} }
return; return;
} }
...@@ -623,15 +614,37 @@ static JSValue js_init_pubsub(JSContext *ctx, JSValueConst thisVal, ...@@ -623,15 +614,37 @@ static JSValue js_init_pubsub(JSContext *ctx, JSValueConst thisVal,
int argc, JSValueConst *argv) int argc, JSValueConst *argv)
{ {
if (JS_ToUint32(ctx, &nbDrone, argv[0])) if (JS_ToUint32(ctx, &nbDrone, argv[0]))
return JS_EXCEPTION; goto fail;
if (JS_ToUint32(ctx, &nbSubscriber, argv[1])) if (JS_ToUint32(ctx, &nbSubscriber, argv[1]))
return JS_EXCEPTION; goto fail;
if (pipe(sendMessagePipefds) < 0) {
perror("Failed to create sendMessagePipe");
goto fail;
}
if (set_pipe_non_blocking(sendMessagePipefds[0]) < 0)
goto fail;
if (pipe(sendLogPipefds) < 0) {
perror("Failed to create sendLogPipe");
goto fail;
}
if (set_pipe_non_blocking(sendLogPipefds[0]) < 0)
perror("Failed to set sendLogPipe non blocking");
currentSendMessage = UA_STRING(""); currentSendMessage = UA_STRING("");
currentSendLog = UA_STRING(""); currentSendLog = UA_STRING("");
droneObjectIdList = (JSValue *) malloc((nbDrone + nbSubscriber) * sizeof(JSValueConst)); droneObjectIdList = (JSValue *) malloc((nbDrone + nbSubscriber) * sizeof(JSValueConst));
return JS_NewInt32(ctx, 0); return JS_NewInt32(ctx, 0);
fail:
close(sendMessagePipefds[0]);
close(sendMessagePipefds[1]);
close(sendLogPipefds[0]);
close(sendLogPipefds[1]);
return JS_EXCEPTION;
} }
static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal, static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal,
...@@ -641,10 +654,11 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal, ...@@ -641,10 +654,11 @@ static JSValue js_stop_pubsub(JSContext *ctx, JSValueConst thisVal,
while(!pubsubExited) while(!pubsubExited)
sleep(1); sleep(1);
free(droneObjectIdList); free(droneObjectIdList);
close(sendMessagePipefds[0]);
cleanQueue(&sendMessageQueue); close(sendMessagePipefds[1]);
clear_str(&currentSendMessage); clear_str(&currentSendMessage);
cleanQueue(&sendLogQueue); close(sendLogPipefds[0]);
close(sendLogPipefds[1]);
clear_str(&currentSendLog); clear_str(&currentSendLog);
return JS_NewInt32(ctx, 0); return JS_NewInt32(ctx, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment