Implement hard liveliness check (#316)

* Hard liveliness check

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

Update

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Revert "Hard liveliness check"

This reverts commit 7d94502933.

* Update

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Uncrustify

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Review comments

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>
This commit is contained in:
Pablo Garrido 2022-03-25 08:33:06 +01:00 committed by GitHub
parent b5187a9f39
commit 12bed7628f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 189 additions and 68 deletions

View File

@ -70,6 +70,9 @@ option(UCLIENT_PROFILE_MATCHING "Enable QoS matching support." OFF)
set(UCLIENT_SHARED_MEMORY_MAX_ENTITIES 4 CACHE STRING "Max number of entities involved in shared memory.")
set(UCLIENT_SHARED_MEMORY_STATIC_MEM_SIZE 10 CACHE STRING "Max number data buffers stored in shared memory")
option(UCLIENT_HARD_LIVELINESS_CHECK "Enable hard liveliness check." OFF)
set(UCLIENT_HARD_LIVELINESS_CHECK_TIMEOUT 10000 CACHE STRING "Set the hard liveliness check interval in milliseconds.")
# Off-standard features and tweaks
option(UCLIENT_TWEAK_XRCE_WRITE_LIMIT "This feature uses a tweak to allow XRCE WRITE DATA submessages grater than 64 kB." ON)

View File

@ -67,5 +67,17 @@
#cmakedefine UCLIENT_TWEAK_XRCE_WRITE_LIMIT
#cmakedefine UCLIENT_HARD_LIVELINESS_CHECK
#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define UXR_CONFIG_HARD_LIVELINESS_CHECK_TIMEOUT_STR "@UCLIENT_HARD_LIVELINESS_CHECK_TIMEOUT@"
#endif
// Version checks
#if UXR_CLIENT_VERSION_MAJOR >= 3
#error UCLIENT_HARD_LIVELINESS_CHECK shall be included in session API
#error MTU must be included in CREATE_CLIENT_Payload properties
#endif
#endif // _UXR_CLIENT_CONFIG_H_

View File

@ -24,6 +24,7 @@ extern "C"
#endif // ifdef __cplusplus
#include <uxr/client/defines.h>
#include <uxr/client/config.h>
#include <ucdr/microcdr.h>
#include <stdint.h>
@ -40,7 +41,24 @@ extern "C"
#define UXR_SAMPLE_DELTA_SEQUENCE_MAX 8
#define UXR_PACKED_SAMPLES_SEQUENCE_MAX 8
#define UXR_TRANSPORT_LOCATOR_SEQUENCE_MAX 4
#ifdef UCLIENT_PROFILE_SHARED_MEMORY
#define PROFILE_SHARED_MEMORY_SEQ_COUNT 1
#else
#define PROFILE_SHARED_MEMORY_SEQ_COUNT 0
#endif // ifdef UCLIENT_PROFILE_SHARED_MEMORY
#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define HARD_LIVELINESS_CHECK_SEQ_COUNT 1
#else
#define HARD_LIVELINESS_CHECK_SEQ_COUNT 0
#endif // ifdef UCLIENT_HARD_LIVELINESS_CHECK
#if (PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT) == 0
#define UXR_PROPERTY_SEQUENCE_MAX 1
#else
#define UXR_PROPERTY_SEQUENCE_MAX PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT
#endif // if (PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT) == 0
typedef struct Time_t
{

View File

@ -33,7 +33,7 @@ extern "C"
#define UXR_PING_BUF 16 // 4 (HEADER SIZE) + 4 (SUBHEADER_SIZE) + 8 (GET_Info payload)
#define GET_INFO_MSG_SIZE 8
#define GET_INFO_REQUEST_ID 9
#define GET_INFO_REQUEST_PING_ID 10
struct uxrSession;

View File

@ -21,12 +21,22 @@
#include "../../profile/shared_memory/shared_memory_internal.h"
#ifdef UCLIENT_PROFILE_SHARED_MEMORY
#define CREATE_SESSION_PROPERTIES_MAX_SIZE 21
#define PROFILE_SHARED_MEMORY_ADD_SIZE 21
#else
#define PROFILE_SHARED_MEMORY_ADD_SIZE 0
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */
#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define HARD_LIVELINESS_CHECK_ADD_SIZE 26
#else
#define HARD_LIVELINESS_CHECK_ADD_SIZE 0
#endif /* ifdef UCLIENT_HARD_LIVELINESS_CHECK */
#define CREATE_SESSION_PROPERTIES_MAX_SIZE PROFILE_SHARED_MEMORY_ADD_SIZE + HARD_LIVELINESS_CHECK_ADD_SIZE
#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE + \
CREATE_SESSION_PROPERTIES_MAX_SIZE)
#else
#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE)
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */
#define DELETE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + DELETE_CLIENT_PAYLOAD_SIZE)
#define HEARTBEAT_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + HEARTBEAT_PAYLOAD_SIZE)
@ -102,6 +112,12 @@ static void read_submessage_acknack(
static void read_submessage_timestamp_reply(
uxrSession* session,
ucdrBuffer* submessage);
static void read_submessage_get_info(
uxrSession* session,
ucdrBuffer* submessage);
void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage);
#ifdef PERFORMANCE_TESTING
static void read_submessage_performance(
uxrSession* session,
@ -125,9 +141,6 @@ static bool run_session_until_sync(
uxrSession* session,
int timeout);
pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer);
//==================================================================
// PUBLIC
//==================================================================
@ -626,52 +639,6 @@ void uxr_flash_output_streams(
//==================================================================
// PRIVATE
//==================================================================
pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer)
{
bool success = false;
bool ret = false;
bool active_session = false;
if (ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE)
{
uint8_t id = 0;
uint8_t flags = 0;
uint16_t length = 0;
uxr_deserialize_submessage_header(buffer, &id, &flags, &length);
success = ucdr_buffer_remaining(buffer) >= length;
if (success && id == SUBMESSAGE_ID_INFO)
{
INFO_Payload info_payload;
success &= uxr_deserialize_BaseObjectReply(buffer, &info_payload.base);
active_session = info_payload.base.result.implementation_status;
success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_config);
if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(buffer, &info_payload.object_info.config);
}
success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(buffer, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(buffer,
&info_payload.object_info.activity._.agent.availability);
ret = success && (info_payload.object_info.activity._.agent.availability > 0);
}
}
}
}
return ret ? (active_session ? PONG_IN_SESSION_STATUS : PONG_NO_SESSION_STATUS) : NO_PONG_STATUS;
}
bool uxr_run_session_until_pong(
uxrSession* session,
int timeout_ms)
@ -877,10 +844,6 @@ void read_message(
uxrStreamId id = uxr_stream_id_from_raw(stream_id_raw, UXR_INPUT_STREAM);
read_stream(session, ub, id, seq_num);
}
else
{
session->on_pong_flag = uxr_acknack_pong(ub);
}
}
void read_stream(
@ -989,6 +952,14 @@ void read_submessage(
read_submessage_timestamp_reply(session, submessage);
break;
case SUBMESSAGE_ID_GET_INFO:
read_submessage_get_info(session, submessage);
break;
case SUBMESSAGE_ID_INFO:
read_submessage_info(session, submessage);
break;
#ifdef PERFORMANCE_TESTING
case SUBMESSAGE_ID_PERFORMANCE:
read_submessage_performance(session, submessage, length);
@ -1095,6 +1066,66 @@ void read_submessage_timestamp_reply(
process_timestamp_reply(session, &timestamp_reply);
}
void read_submessage_get_info(
uxrSession* session,
ucdrBuffer* submessage)
{
GET_INFO_Payload get_info_payload = {
0
};
INFO_Payload info_payload = {
0
};
uxr_deserialize_GET_INFO_Payload(submessage, &get_info_payload);
info_payload.base.related_request.request_id = get_info_payload.base.request_id;
uint8_t buffer[12];
ucdrBuffer ub;
ucdr_init_buffer_origin_offset(&ub, buffer, sizeof(buffer), 0u, uxr_session_header_offset(&session->info));
uxr_serialize_INFO_Payload(&ub, &info_payload);
uxr_stamp_session_header(&session->info, 0, 0, ub.init);
send_message(session, buffer, ucdr_buffer_length(&ub));
}
void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage)
{
INFO_Payload info_payload;
bool success = true;
success &= uxr_deserialize_BaseObjectReply(submessage, &info_payload.base);
bool active_session = info_payload.base.result.implementation_status;
success &= ucdr_deserialize_bool(submessage, &info_payload.object_info.optional_config);
if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(submessage, &info_payload.object_info.config);
}
success &= ucdr_deserialize_bool(submessage, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(submessage, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(submessage,
&info_payload.object_info.activity._.agent.availability);
session->on_pong_flag = (success && (info_payload.object_info.activity._.agent.availability > 0)) ?
(active_session ?
PONG_IN_SESSION_STATUS :
PONG_NO_SESSION_STATUS) :
NO_PONG_STATUS;
}
}
}
#ifdef PERFORMANCE_TESTING
void read_submessage_performance(
uxrSession* session,

View File

@ -44,7 +44,9 @@ void uxr_buffer_create_session(
ucdrBuffer* ub,
uint16_t mtu)
{
CREATE_CLIENT_Payload payload;
CREATE_CLIENT_Payload payload = {
0
};
payload.client_representation.xrce_cookie = DDS_XRCE_XRCE_COOKIE;
payload.client_representation.xrce_version = DDS_XRCE_XRCE_VERSION;
payload.client_representation.xrce_vendor_id = VENDOR_ID_EPROSIMA;
@ -54,12 +56,35 @@ void uxr_buffer_create_session(
payload.client_representation.client_key.data[3] = info->key[3];
payload.client_representation.session_id = info->id;
payload.client_representation.optional_properties = false;
#ifdef UCLIENT_PROFILE_SHARED_MEMORY
payload.client_representation.optional_properties = true;
payload.client_representation.properties.size = 1;
payload.client_representation.properties.data[0].name = "uxr_sm";
payload.client_representation.properties.data[0].value = "1";
payload.client_representation.properties.data[payload.client_representation.properties.size].name = "uxr_sm";
payload.client_representation.properties.data[payload.client_representation.properties.size].value = "1";
payload.client_representation.properties.size++;
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */
#ifdef UCLIENT_HARD_LIVELINESS_CHECK
payload.client_representation.optional_properties = true;
payload.client_representation.properties.data[payload.client_representation.properties.size].name = "uxr_hl";
const char* str = UXR_CONFIG_HARD_LIVELINESS_CHECK_TIMEOUT_STR;
if (strlen(str) > 6)
{
str = "999999";
}
char buffer[7];
const size_t leading_zeros = 6 - strlen(str);
memset(buffer, '0', leading_zeros);
memcpy(buffer + leading_zeros, str, strlen(str));
buffer[6] = '\0';
payload.client_representation.properties.data[payload.client_representation.properties.size].value = buffer;
payload.client_representation.properties.size++;
#endif /* ifdef UCLIENT_HARD_LIVELINESS_CHECK */
payload.client_representation.mtu = mtu;
info->last_request_id = UXR_REQUEST_LOGIN;

View File

@ -11,7 +11,8 @@
#include "../core/session/submessage_internal.h"
bool serialize_get_info_message(
ucdrBuffer* ub);
ucdrBuffer* ub,
const uint8_t session_id);
pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer);
@ -30,6 +31,10 @@ bool uxr_read_session_header(
uint8_t* stream_id_raw,
uxrSeqNum* seq_num);
void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage);
//==================================================================
// PUBLIC
//==================================================================
@ -43,7 +48,7 @@ bool uxr_ping_agent_session(
ucdr_init_buffer(&ub, output_buffer, sizeof(output_buffer));
bool ret = false;
if (serialize_get_info_message(&ub))
if (serialize_get_info_message(&ub, session->info.id))
{
size_t message_length = ucdr_buffer_length(&ub);
@ -69,7 +74,7 @@ bool uxr_ping_agent_attempts(
ucdrBuffer ub;
ucdr_init_buffer(&ub, output_buffer, sizeof(output_buffer));
if (serialize_get_info_message(&ub))
if (serialize_get_info_message(&ub, SESSION_ID_WITHOUT_CLIENT_KEY))
{
size_t message_length = ucdr_buffer_length(&ub);
@ -110,19 +115,20 @@ inline bool uxr_ping_agent(
// PRIVATE
//==================================================================
bool serialize_get_info_message(
ucdrBuffer* ub)
ucdrBuffer* ub,
const uint8_t session_id)
{
bool res = true;
GET_INFO_Payload gi_payload;
gi_payload.base.request_id = (RequestId){{
0x00, GET_INFO_REQUEST_ID
0x00, GET_INFO_REQUEST_PING_ID
}
};
gi_payload.base.object_id = DDS_XRCE_OBJECTID_AGENT;
gi_payload.info_mask = INFO_ACTIVITY;
uxr_serialize_message_header(ub, SESSION_ID_WITHOUT_CLIENT_KEY, 0, 0, 0);
uxr_serialize_message_header(ub, session_id, 0, 0, 0);
res &= uxr_buffer_submessage_header(ub, SUBMESSAGE_ID_GET_INFO, GET_INFO_MSG_SIZE, 0);
res &= uxr_serialize_GET_INFO_Payload(ub, &gi_payload);
@ -158,3 +164,29 @@ bool listen_info_message(
return success;
}
pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer)
{
bool success = false;
uxrSession fake_session = {
0
};
if (ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE)
{
uint8_t id = 0;
uint8_t flags = 0;
uint16_t length = 0;
uxr_deserialize_submessage_header(buffer, &id, &flags, &length);
success = ucdr_buffer_remaining(buffer) >= length;
if (success && id == SUBMESSAGE_ID_INFO)
{
fake_session.on_pong_flag = NO_PONG_STATUS;
read_submessage_info(&fake_session, buffer);
}
}
return fake_session.on_pong_flag;
}