Commit c5484255 authored by Léo-Paul Géneau's avatar Léo-Paul Géneau 👾

Pub/Sub: remove subscribeOnly function

Merge subscribeOnly function into runPubsub
parent 544191ac
......@@ -47,15 +47,6 @@ typedef struct {
} getter;
} VariableData;
int subscribeOnly(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData *variableArray, size_t nbVariable,
UA_UInt32 id, UA_UInt32 nbReader, UA_Duration interval,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic),
UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
void (*update)(UA_UInt32 id, const UA_DataValue*),
UA_Boolean *running);
int runPubsub(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData *variableArray, size_t nbVariable,
......@@ -63,8 +54,8 @@ int runPubsub(UA_String *transportProfile,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic),
UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
VariableData (*get_value)(UA_String identifier),
void (*update)(UA_UInt32 id, const UA_DataValue*),
UA_Boolean *running);
void (*update)(UA_UInt32 id, const UA_DataValue*, bool print),
bool publish, UA_Boolean *running);
UA_String get_message(void);
......@@ -72,10 +63,6 @@ UA_UInt16 get_drone_id(UA_UInt32 nb);
void init_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic);
void pubsub_update_coordinates(UA_UInt32 id, const UA_DataValue *var);
void pubsub_print_coordinates(UA_UInt32 id, const UA_DataValue *var);
VariableData pubsub_get_value(UA_String identifier);
DLL_PUBLIC JSModuleDef *js_init_module(JSContext *ctx, const char *module_name);
......
......@@ -10,9 +10,10 @@ UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent,
readerGroupIdent, readerIdent;
UA_DataSetReaderConfig readerConfig;
bool isPublisher;
VariableData (*pubsubGetValue)(UA_String identifier);
static void (*callbackUpdate)(UA_UInt32, const UA_DataValue*);
static void (*callbackUpdate)(UA_UInt32, const UA_DataValue*, bool print);
static void fillDataSetMetaData(UA_DataSetMetaDataType *pMetaData,
VariableData *variableArray,
......@@ -251,7 +252,7 @@ dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitoredItemId,
void *monitoredItemContext, const UA_NodeId *nodeId,
void *nodeContext, UA_UInt32 attributeId,
const UA_DataValue *var) {
callbackUpdate(nodeId->identifier.numeric, var);
callbackUpdate(nodeId->identifier.numeric, var, !isPublisher);
}
/**
......@@ -437,7 +438,7 @@ subscribe(UA_Server *server,
UA_UInt32 id, UA_UInt32 nbReader, UA_Duration interval,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic),
UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
void (*update)(UA_UInt32 id, const UA_DataValue*)) {
void (*update)(UA_UInt32 id, const UA_DataValue*, bool print)) {
UA_UInt16 publisherIdent;
UA_StatusCode retval;
char readerName[19];
......@@ -472,28 +473,6 @@ subscribe(UA_Server *server,
return retval;
}
int subscribeOnly(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData *variableArray, size_t nbVariable,
UA_UInt32 id, UA_UInt32 nbReader, UA_Duration interval,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic),
UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
void (*update)(UA_UInt32 id, const UA_DataValue*),
UA_Boolean *running) {
UA_Server *server;
UA_StatusCode retval;
server = setServer(transportProfile, networkAddressUrl, id);
setVariableType(server, variableArray, nbVariable);
subscribe(server, variableArray, nbVariable, id, nbReader, interval,
init_node_id, get_reader_id, update);
retval = UA_Server_run(server, running);
UA_Server_delete(server);
return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}
int runPubsub(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl,
VariableData *variableArray, size_t nbVariable,
......@@ -501,8 +480,8 @@ int runPubsub(UA_String *transportProfile,
void (*init_node_id)(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic),
UA_UInt16 (*get_reader_id)(UA_UInt32 nb),
VariableData (*get_value)(UA_String identifier),
void (*update)(UA_UInt32 id, const UA_DataValue*),
UA_Boolean *running) {
void (*update)(UA_UInt32 id, const UA_DataValue*, bool print),
bool publish, UA_Boolean *running) {
UA_Server *server;
UA_StatusCode retval;
......@@ -511,21 +490,24 @@ int runPubsub(UA_String *transportProfile,
/* Publishing */
pubsubGetValue = get_value;
isPublisher = publish;
if (isPublisher) {
pubsubGetValue = get_value;
addPublishedDataSet(server, id);
for(UA_UInt32 i = 0; i < nbVariable; i++) {
retval = addDataSourceVariable(server, variableArray[i]);
addPublishedDataSet(server, id);
for(UA_UInt32 i = 0; i < nbVariable; i++) {
retval = addDataSourceVariable(server, variableArray[i]);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
addDataSetField(server, variableArray[i]);
}
addWriterGroup(server, interval);
retval = addDataSetWriter(server);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
addDataSetField(server, variableArray[i]);
}
addWriterGroup(server, interval);
retval = addDataSetWriter(server);
if (retval != UA_STATUSCODE_GOOD)
return EXIT_FAILURE;
/* Subscribing */
subscribe(server, variableArray, nbVariable, id, nbReader, interval,
......
......@@ -260,7 +260,7 @@ void init_node_id(UA_UInt32 id, UA_UInt32 nb, UA_UInt32 magic) {
}
}
void pubsub_update_coordinates(UA_UInt32 id, const UA_DataValue *var)
static void pubsub_update_variables(UA_UInt32 id, const UA_DataValue *var, bool print)
{
JSDroneData* s;
UA_String uaStr;
......@@ -275,59 +275,43 @@ void pubsub_update_coordinates(UA_UInt32 id, const UA_DataValue *var)
s->longitude = positionArray[1];
s->altitudeAbs = positionArray[2];
s->altitudeRel = positionArray[3];
if (print) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT,
"Received position of drone %d: %f° %f° %fm %fm",
s->id, s->latitude, s->longitude, s->altitudeAbs, s->altitudeRel);
}
return;
} else if (s->speedArrayId == id) {
speedArray = (UA_Float*) var->value.data;
s->yaw = speedArray[0];
s->speed = speedArray[1];
s->climbRate = speedArray[2];
if (print) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT,
"Received speed of drone %d: %f° %fm/s %fm/s",
s->id, s->yaw, s->speed, s->climbRate);
}
return;
} else if (s->messageId == id) {
uaStr = *(UA_String*) var->value.data;
pthread_mutex_lock(&mutex);
while(strlen(s->message) != 0)
pthread_cond_wait(&threadCond, &mutex);
memcpy(s->message, uaStr.data, uaStr.length);
s->message[uaStr.length] = '\0';
pthread_mutex_unlock(&mutex);
return;
}
}
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "NodeId not found");
}
void pubsub_print_coordinates(UA_UInt32 id, const UA_DataValue *var)
{
JSDroneData* s;
UA_String uaStr;
UA_Double* positionArray;
UA_Float* directionArray;
for(UA_UInt32 i = 0; i < nbDrone; i++) {
s = (JSDroneData *) JS_GetOpaque(droneObjectIdList[i], jsDroneClassId);
if (s->positionArrayId == id) {
positionArray = *(UA_Double**) var->value.data;
s->latitude = positionArray[0];
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received latitude of drone %d: %f°", s->id, s->latitude);
s->longitude = positionArray[1];
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received longitude of drone %d: %f°", s->id, s->longitude);
s->altitudeAbs = positionArray[2];
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received absolute altitude of drone %d: %fm", s->id, s->altitudeAbs);
s->altitudeRel = positionArray[3];
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received relative altitude of drone %d: %fm", s->id, s->altitudeRel);
s->altitudeAbs = *(UA_Float*) var->value.data;
return;
} else if (s->directionArrayId == id) {
directionArray = (UA_Float*) var->value.data;
s->yaw = directionArray[0];
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received yaw angle of drone %d: %f°", s->id, s->yaw);
s->speed = directionArray[1];
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received airspeed of drone %d: %fm/s", s->id, s->speed);
return;
}else if (s->messageId == id) {
uaStr = *(UA_String*) var->value.data;
if (!print) {
pthread_mutex_lock(&mutex);
while(strlen(s->message) != 0)
pthread_cond_wait(&threadCond, &mutex);
}
memcpy(s->message, uaStr.data, uaStr.length);
s->message[uaStr.length] = '\0';
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT, "Received message of drone %d: %s", s->id, s->message);
if (print) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_CLIENT,
"Received message for drone %d: %s", s->id, s->message);
} else {
pthread_mutex_unlock(&mutex);
}
return;
}
}
......@@ -373,18 +357,11 @@ static JSValue js_run_pubsub(JSContext *ctx, JSValueConst this_val,
if (JS_ToFloat64(ctx, &interval, argv[4]))
return JS_EXCEPTION;
if (JS_ToBool(ctx, argv[5])) {
res = runPubsub(&transportProfile, &networkAddressUrl,
droneVariableArray, countof(droneVariableArray), id,
nbDrone, interval, init_node_id, get_drone_id,
pubsub_get_value, pubsub_update_coordinates,
&pubsubShouldRun);
} else {
res = subscribeOnly(&transportProfile, &networkAddressUrl,
droneVariableArray, countof(droneVariableArray), id,
nbDrone, interval, init_node_id, get_drone_id,
pubsub_print_coordinates, &pubsubShouldRun);
}
res = runPubsub(&transportProfile, &networkAddressUrl, droneVariableArray,
countof(droneVariableArray), id, nbDrone, interval,
init_node_id, get_drone_id, pubsub_get_value,
pubsub_update_variables, JS_ToBool(ctx, argv[5]),
&pubsubShouldRun);
pubsubExited = true;
JS_FreeCString(ctx, ipv6);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment