Fix in session ping (#289)

* Fix race condition

* Rename arguments existing API

* Added new API

* Add examples

* Uncrustify

* Add attemps

* Refactor order

* Fix unlock

* Fix warning

* Fix stream selection

* Fix windows warning

* Update

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Fix memcheck

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Fix memcheck

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Uncrustify

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>
This commit is contained in:
Pablo Garrido 2021-11-25 15:31:03 +01:00 committed by GitHub
parent e81b10ab33
commit 5df12014a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 202 additions and 49 deletions

View File

@ -45,9 +45,29 @@ int main(
return 1;
}
// Sending ping without initing a XRCE session
if (uxr_ping_agent_attempts(&transport.comm, 1000, 10))
{
printf("Success! Agent is up on device '%s'\n", dev);
printf("Success! Agent is up on device '%s' without a session\n", dev);
}
else
{
printf("Sorry, no agent available at device '%s'\n", dev);
return 1;
}
// Sending ping with initing a XRCE session
uxrSession session;
uxr_init_session(&session, &transport.comm, 0xAAAABBBB);
if (!uxr_create_session(&session))
{
printf("Error at create session.\n");
return 1;
}
if (uxr_ping_agent_session(&session, 1000, 1))
{
printf("Success! Agent is up on device '%s' within a session\n", dev);
}
else
{

View File

@ -39,6 +39,7 @@ int main(
return 1;
}
// Sending ping without initing a XRCE session
if (uxr_ping_agent_attempts(&transport.comm, 1000, 10))
{
printf("Success! Agent is up on %s:%s\n", ip, port);
@ -48,6 +49,24 @@ int main(
printf("Sorry, no agent available at %s:%s\n", ip, port);
}
// Sending ping with initing a XRCE session
uxrSession session;
uxr_init_session(&session, &transport.comm, 0xAAAABBBB);
if (!uxr_create_session(&session))
{
printf("Error at create session.\n");
return 1;
}
if (uxr_ping_agent_session(&session, 1000, 1))
{
printf("Success! Agent is up on %s:%s within a session\n", ip, port);
}
else
{
printf("Sorry, no agent available at %s:%s\n", ip, port);
}
uxr_close_tcp_transport(&transport);
return 0;

View File

@ -39,9 +39,29 @@ int main(
return 1;
}
// Sending ping without initing a XRCE session
if (uxr_ping_agent_attempts(&transport.comm, 1000, 10))
{
printf("Success! Agent is up on %s:%s\n", ip, port);
printf("Success! Agent is up on %s:%s without a session\n", ip, port);
}
else
{
printf("Sorry, no agent available at %s:%s\n", ip, port);
return 1;
}
// Sending ping with initing a XRCE session
uxrSession session;
uxr_init_session(&session, &transport.comm, 0xAAAABBBB);
if (!uxr_create_session(&session))
{
printf("Error at create session.\n");
return 1;
}
if (uxr_ping_agent_session(&session, 1000, 1))
{
printf("Success! Agent is up on %s:%s within a session\n", ip, port);
}
else
{

View File

@ -193,6 +193,7 @@ typedef struct uxrSession
void* on_reply_args;
bool on_data_flag;
bool on_pong_flag;
uxrContinuousArgs continuous_args;
#ifdef UCLIENT_PROFILE_MULTITHREAD

View File

@ -35,6 +35,24 @@ extern "C"
#define GET_INFO_MSG_SIZE 8
#define GET_INFO_REQUEST_ID 9
struct uxrSession;
/**
* @brief Checks the availability status of a valid connection with an agent.
* This methods performs a single attempt.
* This method uses an already opened session in order to do not
* interfere with the rest of the application.
* @ingroup general_utils
* @param session Pointer to the uxrSession struct inited.
* @param timeout Time, in milliseconds, for a ping attempt.
* @param attempts Maximum number of ping attempts to be performed.
* @return `true` in case of a successful ping to the agent, `false` otherwise.
*/
bool uxr_ping_agent_session(
struct uxrSession* session,
const int timeout_ms,
const uint8_t attempts);
/**
* @brief Checks the availability status of a valid connection with an agent.
* This methods performs a single attempt.
@ -48,7 +66,7 @@ extern "C"
*/
UXRDLLAPI bool uxr_ping_agent(
uxrCommunication* comm,
const int timeout);
const int timeout_ms);
/**
* @brief Checks the availability status of a valid connection with an agent.
@ -66,7 +84,7 @@ UXRDLLAPI bool uxr_ping_agent(
*/
UXRDLLAPI bool uxr_ping_agent_attempts(
uxrCommunication* comm,
const int timeout,
const int timeout_ms,
const uint8_t attempts);

View File

@ -14,6 +14,7 @@
#include "stream/output_best_effort_stream_internal.h"
#include "stream/output_reliable_stream_internal.h"
#include "stream/seq_num_internal.h"
#include "../serialization/xrce_subheader_internal.h"
#include "../log/log_internal.h"
#include "../../util/time_internal.h"
#include <uxr/client/profile/multithread/multithread.h>
@ -124,6 +125,9 @@ static bool run_session_until_sync(
uxrSession* session,
int timeout);
bool uxr_acknack_pong(
ucdrBuffer* buffer);
//==================================================================
// PUBLIC
//==================================================================
@ -362,9 +366,10 @@ bool uxr_run_session_until_data(
}
while (remaining_time > 0);
UXR_UNLOCK_SESSION(session);
bool ret = session->on_data_flag;
return session->on_data_flag;
UXR_UNLOCK_SESSION(session);
return ret;
}
bool uxr_run_session_until_timeout(
@ -607,6 +612,75 @@ void uxr_flash_output_streams(
//==================================================================
// PRIVATE
//==================================================================
bool uxr_acknack_pong(
ucdrBuffer* buffer)
{
bool success = false;
bool must_be_read = ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE;
if (must_be_read)
{
uint8_t id = 0;
uint8_t flags = 0;
uint16_t length = 0;
uxr_deserialize_submessage_header(buffer, &id, &flags, &length);
success = ucdr_buffer_remaining(buffer) >= length;
if (success && id == SUBMESSAGE_ID_INFO)
{
INFO_Payload info_payload;
success &= uxr_deserialize_BaseObjectReply(buffer, &info_payload.base);
success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_config);
if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(buffer, &info_payload.object_info.config);
}
success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(buffer, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(buffer,
&info_payload.object_info.activity._.agent.availability);
success &= info_payload.object_info.activity._.agent.availability > 0;
}
}
}
}
return success;
}
bool uxr_run_session_until_pong(
uxrSession* session,
int timeout_ms)
{
int64_t start_timestamp = uxr_millis();
int remaining_time = timeout_ms;
uxr_flash_output_streams(session);
session->on_pong_flag = false;
do
{
listen_message_reliably(session, remaining_time);
if (session->on_pong_flag)
{
break;
}
remaining_time = timeout_ms - (int)(uxr_millis() - start_timestamp);
}
while (remaining_time > 0);
bool ret = session->on_pong_flag;
return ret;
}
bool listen_message(
uxrSession* session,
int poll_ms)
@ -779,12 +853,17 @@ void read_message(
uxrSession* session,
ucdrBuffer* ub)
{
uint8_t stream_id_raw; uxrSeqNum seq_num;
uint8_t stream_id_raw = 0;
uxrSeqNum seq_num;
if (uxr_read_session_header(&session->info, ub, &stream_id_raw, &seq_num))
{
uxrStreamId id = uxr_stream_id_from_raw(stream_id_raw, UXR_INPUT_STREAM);
read_stream(session, ub, id, seq_num);
}
else if (stream_id_raw == SESSION_ID_WITHOUT_CLIENT_KEY && uxr_acknack_pong(ub))
{
session->on_pong_flag = true;
}
}
void read_stream(

View File

@ -2,10 +2,12 @@
#include <uxr/client/util/time.h>
#include <uxr/client/core/type/xrce_types.h>
#include <uxr/client/core/session/session.h>
#include <uxr/client/core/session/stream/seq_num.h>
#include <uxr/client/profile/multithread/multithread.h>
#include "../core/serialization/xrce_header_internal.h"
#include "../core/serialization/xrce_subheader_internal.h"
#include "../core/session/submessage_internal.h"
bool serialize_get_info_message(
@ -18,12 +20,42 @@ bool listen_info_message(
uxrCommunication* comm,
const int timeout);
bool uxr_run_session_until_pong(
uxrSession* session,
int timeout_ms);
//==================================================================
// PUBLIC
//==================================================================
bool uxr_ping_agent_session(
uxrSession* session,
const int timeout_ms,
const uint8_t attempts)
{
uint8_t output_buffer[UXR_PING_BUF];
ucdrBuffer ub;
ucdr_init_buffer(&ub, output_buffer, sizeof(output_buffer));
bool ret = false;
if (serialize_get_info_message(&ub))
{
size_t message_length = ucdr_buffer_length(&ub);
UXR_LOCK_SESSION(session);
for (size_t i = 0; !ret && i < attempts; i++)
{
ret = session->comm->send_msg(session->comm->instance, output_buffer, message_length);
ret &= uxr_run_session_until_pong(session, timeout_ms);
}
UXR_UNLOCK_SESSION(session);
}
return ret;
}
bool uxr_ping_agent_attempts(
uxrCommunication* comm,
const int timeout,
const int timeout_ms,
const uint8_t attempts)
{
bool agent_pong = false;
@ -45,11 +77,11 @@ bool uxr_ping_agent_attempts(
message_length);
int64_t timestamp = uxr_millis();
int poll = timeout;
int poll = timeout_ms;
do
{
agent_pong = listen_info_message(comm, timeout);
agent_pong = listen_info_message(comm, timeout_ms);
poll -= (int)(uxr_millis() - timestamp);
timestamp = uxr_millis();
} while (0 < poll && !agent_pong);
@ -63,9 +95,9 @@ bool uxr_ping_agent_attempts(
inline bool uxr_ping_agent(
uxrCommunication* comm,
const int timeout)
const int timeout_ms)
{
return uxr_ping_agent_attempts(comm, timeout, 1);
return uxr_ping_agent_attempts(comm, timeout_ms, 1);
}
//==================================================================
@ -91,39 +123,6 @@ bool serialize_get_info_message(
return res;
}
bool uxr_acknack_pong(
ucdrBuffer* buffer)
{
bool success = true;
INFO_Payload info_payload;
success &= uxr_deserialize_BaseObjectReply(buffer, &info_payload.base);
success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_config);
if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(buffer, &info_payload.object_info.config);
}
success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(buffer, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(buffer,
&info_payload.object_info.activity._.agent.availability);
success &= (bool)info_payload.object_info.activity._.agent.availability;
}
}
else
{
success = false;
}
return success;
}
bool listen_info_message(
uxrCommunication* comm,
const int timeout)
@ -145,10 +144,7 @@ bool listen_info_message(
size_t advance_len = 1 /* uint8_t session_id */
+ 1 /* uint8_t stream_id */
+ 2 /* uint16_t sequence_number */
+ CLIENT_KEY_SIZE
+ 1 /* uint8_t submessage_header_id */
+ 1 /* uint8_t submessage_flags */
+ 2; /* uint16_t submessage_length */
+ CLIENT_KEY_SIZE;
ucdr_advance_buffer(&ub, advance_len);
return uxr_acknack_pong(&ub);