Commit a46874c3 authored by Ivan Tyagov's avatar Ivan Tyagov

Add heart beat check to an asynchronous callback.

parent 96f3df31
#!/bin/bash #!/bin/bash
./server -m 1 -b 1 -i 1 -l 2,3 -t 100 ./server -m 1 -b 1 -i 1 -l 2,3 -t 500
#!/bin/bash #!/bin/bash
./server -p 4841 -m 1 -b 1 -i 2 -l 1,3 -t 100 ./server -p 4841 -m 1 -b 1 -i 2 -l 1,3 -t 500
#!/bin/bash #!/bin/bash
./server -p 4842 -m 1 -b 1 -i 3 -l 1,2 -t 100 ./server -p 4842 -m 1 -b 1 -i 3 -l 1,2 -t 500
...@@ -34,40 +34,20 @@ static void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitore ...@@ -34,40 +34,20 @@ static void dataChangeNotificationCallback(UA_Server *server, UA_UInt32 monitore
if (coupler_id!=COUPLER_ID) { if (coupler_id!=COUPLER_ID) {
//HEART_BEAT_ID_LIST //HEART_BEAT_ID_LIST
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Got heart beat from ID = %d, timestamp=%ld", coupler_id, micro_seconds); UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Got heart beat from ID = %d, timestamp=%ld", coupler_id, micro_seconds);
// convert coupler_id to str // convert coupler_id to str
char* coupler_id_str = convertInt2Str(coupler_id); char* coupler_id_str = convertInt2Str(coupler_id);
// convert micro seconds to str // convert micro seconds to str
char* micro_seconds_str = convertLongInt2Str(micro_seconds); char* micro_seconds_str = convertLongInt2Str(micro_seconds);
// XXX: implement check for dead couplers using asynchronous callbacks from open62541
int i, id;
size_t n = sizeof(HEART_BEAT_ID_LIST)/sizeof(HEART_BEAT_ID_LIST[0]);
for (int i = 0; i < n; i++) {
id = HEART_BEAT_ID_LIST[i];
if (id > 0) {
// convert to str as this is the hash key
char* id_str = convertInt2Str(id);
char *last_seen_timestamp = getItem(SUBSCRIBER_DICT, id_str);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\tcheck ID=%s, last_seen=%s", id_str, last_seen_timestamp);
if (last_seen_timestamp!=NULL){
// we do have timestamp for this coupler ID
int last_seen_timestamp_int = atoi(last_seen_timestamp);
int timestamp_delta = micro_seconds - last_seen_timestamp_int;
bool is_down = (timestamp_delta > 1000);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\t\tdelta=%d, is_down=%d", timestamp_delta, is_down);
}
}
}
// Add to our local linked list // Add to our local linked list
addItem(&SUBSCRIBER_DICT, coupler_id_str, micro_seconds_str); addItem(&SUBSCRIBER_DICT, coupler_id_str, micro_seconds_str);
} }
} }
// filter out heart_beat from Data Set // filter out heart_beat from Data Set
if(UA_Variant_hasScalarType(&var->value, &UA_TYPES[UA_TYPES_FLOAT])) { if(UA_Variant_hasScalarType(&var->value, &UA_TYPES[UA_TYPES_FLOAT])) {
float heart_beat = *(UA_Float*) var->value.data; float heart_beat = *(UA_Float*) var->value.data;
//UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "heart_beat = %f", heart_beat); //UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "heart_beat = %f", heart_beat);
...@@ -275,6 +255,30 @@ static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) { ...@@ -275,6 +255,30 @@ static void fillTestDataSetMetaData(UA_DataSetMetaDataType *pMetaData) {
pMetaData->fields[1].valueRank = -1; /* scalar */ pMetaData->fields[1].valueRank = -1; /* scalar */
} }
void callbackCheckHeartBeat() {
int i, coupler_id;
long int micro_seconds = getMicroSeconds();
size_t n = sizeof(HEART_BEAT_ID_LIST)/sizeof(HEART_BEAT_ID_LIST[0]);
for (int i = 0; i < n; i++) {
coupler_id = HEART_BEAT_ID_LIST[i];
if (coupler_id > 0) {
// convert to str as this is the hash key
char* coupler_id_str = convertInt2Str(coupler_id);
char *last_seen_timestamp = getItem(SUBSCRIBER_DICT, coupler_id_str);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Check ID=%s, last_seen=%s", coupler_id_str, last_seen_timestamp);
if (last_seen_timestamp != NULL){
// we do have timestamp for this coupler ID
int last_seen_timestamp_int = atoi(last_seen_timestamp);
int timestamp_delta = micro_seconds - last_seen_timestamp_int;
bool is_down = (timestamp_delta > 1000);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "\tdelta=%d, is_down=%d", timestamp_delta, is_down);
}
}
}
}
static int enableSubscribeToHeartBeat(UA_Server *server, UA_ServerConfig *config){ static int enableSubscribeToHeartBeat(UA_Server *server, UA_ServerConfig *config){
// enable subscribe to keep-alive messages // enable subscribe to keep-alive messages
UA_String transportProfile = UA_STRING(DEFAULT_TRANSPORT_PROFILE); UA_String transportProfile = UA_STRING(DEFAULT_TRANSPORT_PROFILE);
...@@ -289,4 +293,8 @@ static int enableSubscribeToHeartBeat(UA_Server *server, UA_ServerConfig *config ...@@ -289,4 +293,8 @@ static int enableSubscribeToHeartBeat(UA_Server *server, UA_ServerConfig *config
/* Add SubscribedVariables to the created DataSetReader */ /* Add SubscribedVariables to the created DataSetReader */
addSubscribedVariables(server, readerIdentifier); addSubscribedVariables(server, readerIdentifier);
// add a callback which will check related coupler's heart beats
UA_UInt64 callbackId = 2;
UA_Server_addRepeatedCallback(server, callbackCheckHeartBeat, NULL, HEART_BEAT_INTERVAL, &callbackId);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment