Skip to content

Commit

Permalink
Merge pull request open62541#6046 from jpfr/merge_14_master_5
Browse files Browse the repository at this point in the history
Merge 1.4 to master
  • Loading branch information
jpfr authored Oct 11, 2023
2 parents 46659f7 + 2259727 commit 744ea7e
Show file tree
Hide file tree
Showing 67 changed files with 1,334 additions and 577 deletions.
17 changes: 3 additions & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# overwritten with more detailed information if git is available.
set(OPEN62541_VER_MAJOR 1)
set(OPEN62541_VER_MINOR 3)
set(OPEN62541_VER_PATCH 6)
set(OPEN62541_VER_PATCH 8)
set(OPEN62541_VER_LABEL "-undefined") # like "-rc1" or "-g4538abcd" or "-g4538abcd-dirty"
set(OPEN62541_VER_COMMIT "unknown-commit")

Expand Down Expand Up @@ -298,9 +298,6 @@ option(UA_FORCE_32BIT "Force compilation as 32-bit executable" OFF)
mark_as_advanced(UA_FORCE_32BIT)

option(UA_FORCE_WERROR "Force compilation with -Werror (or /WX on MSVC)" OFF)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(UA_FORCE_WERROR ON)
endif()

option(UA_ENABLE_DEBUG_SANITIZER "Use sanitizer in debug mode" ON)
mark_as_advanced(UA_ENABLE_DEBUG_SANITIZER)
Expand Down Expand Up @@ -346,14 +343,6 @@ if(UA_ENABLE_PUBSUB_MONITORING)
endif()
endif()

option(UA_ENABLE_PUBSUB_ETH_UADP "Enable the OPC UA Ethernet PubSub support to transport UADP NetworkMessages as payload of Ethernet II frame without IP or UDP headers. This option will include Publish and Subscribe based on EtherType B62C." OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_ETH_UADP)
if(UA_ENABLE_PUBSUB_ETH_UADP)
if(NOT UA_ENABLE_PUBSUB)
message(FATAL_ERROR "For UA_ENABLE_PUBSUB_ETH_UADP PubSub needs to be enabled")
endif()
endif()

option(UA_ENABLE_PUBSUB_BUFMALLOC "Enable allocation with static memory buffer for time critical PubSub parts" OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_BUFMALLOC)
if(UA_ENABLE_PUBSUB_BUFMALLOC)
Expand Down Expand Up @@ -525,8 +514,8 @@ if(UA_ENABLE_ENCRYPTION_LIBRESSL)
find_package(LibreSSL REQUIRED)
list(APPEND open62541_LIBRARIES ${LIBRESSL_LIBRARIES})
if(WIN32)
# Add bestcrypt for random generator
list(APPEND open62541_LIBRARIES bcrypt)
# Add bestcrypt for random generator and ws2_32 for crypto
list(APPEND open62541_LIBRARIES ws2_32 bcrypt)
endif()
endif()

Expand Down
51 changes: 35 additions & 16 deletions arch/eventloop_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t buf
#include "../deps/mqtt-c/src/mqtt.c"

#define MQTT_MESSAGE_MAXLEN (1u << 20) /* 1MB */
#define MQTT_PARAMETERSSIZE 7
#define MQTT_PARAMETERSSIZE 8
#define MQTT_BROKERPARAMETERSSIZE 5 /* Parameters shared by topic connections
* connected to the same broker */

Expand All @@ -83,6 +83,7 @@ static const struct {
{{0, UA_STRING_STATIC("keep-alive")}, &UA_TYPES[UA_TYPES_UINT16], false},
{{0, UA_STRING_STATIC("username")}, &UA_TYPES[UA_TYPES_STRING], false},
{{0, UA_STRING_STATIC("password")}, &UA_TYPES[UA_TYPES_STRING], false},
{{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false},
{{0, UA_STRING_STATIC("subscribe")}, &UA_TYPES[UA_TYPES_BOOLEAN], false},
{{0, UA_STRING_STATIC("topic")}, &UA_TYPES[UA_TYPES_STRING], true}
};
Expand Down Expand Up @@ -589,8 +590,8 @@ MQTTNetworkCallback(UA_ConnectionManager *tcpCM, uintptr_t connectionId,
}

static MQTTBrokerConnection *
createBrokerConnection(MQTTConnectionManager *mcm,
const UA_KeyValueMap *params) {
createBrokerConnection(MQTTConnectionManager *mcm, const UA_KeyValueMap *params,
UA_Boolean validate) {
/* Allocate connection memory */
MQTTBrokerConnection *bc = (MQTTBrokerConnection*)
UA_calloc(1, sizeof(MQTTBrokerConnection));
Expand Down Expand Up @@ -630,23 +631,15 @@ createBrokerConnection(MQTTConnectionManager *mcm,
if(keepAlive && *keepAlive > 0)
bc->keepalive = *keepAlive;

UA_EventLoop *el = mcm->cm.eventSource.eventLoop;
res = el->addCyclicCallback(el, (UA_Callback)MQTTKeepAliveCallback, NULL, bc,
(UA_Double)(bc->keepalive * UA_DATETIME_SEC),
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&bc->keepAliveCallbackId);
if(res != UA_STATUSCODE_GOOD) {
removeBrokerConnection(bc);
return NULL;
}

/* Open the Connection. This also sets the broker connection id to the TCP id. */
UA_KeyValuePair tcpParams[2];
UA_KeyValuePair tcpParams[3];
tcpParams[0].key = UA_QUALIFIEDNAME(0, "address");
UA_Variant_setScalar(&tcpParams[0].value, broker, &UA_TYPES[UA_TYPES_STRING]);
tcpParams[1].key = UA_QUALIFIEDNAME(0, "port");
UA_Variant_setScalar(&tcpParams[1].value, port, &UA_TYPES[UA_TYPES_UINT16]);
UA_KeyValueMap kvm = {2, tcpParams};
tcpParams[2].key = UA_QUALIFIEDNAME(0, "validate");
UA_Variant_setScalar(&tcpParams[2].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]);
UA_KeyValueMap kvm = {3, tcpParams};

UA_ConnectionManager *tcpCM = mcm->tcpCM;
res = tcpCM->openConnection(tcpCM, &kvm, NULL, bc, MQTTNetworkCallback);
Expand All @@ -655,6 +648,22 @@ createBrokerConnection(MQTTConnectionManager *mcm,
return NULL;
}

/* Return non-null to indicate success */
if(validate) {
removeBrokerConnection(bc);
return (MQTTBrokerConnection*)0x01;
}

UA_EventLoop *el = mcm->cm.eventSource.eventLoop;
res = el->addCyclicCallback(el, (UA_Callback)MQTTKeepAliveCallback, NULL, bc,
(UA_Double)(bc->keepalive * UA_DATETIME_SEC),
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&bc->keepAliveCallbackId);
if(res != UA_STATUSCODE_GOOD) {
removeBrokerConnection(bc);
return NULL;
}

UA_LOG_DEBUG(bc->mcm->cm.eventSource.eventLoop->logger,
UA_LOGCATEGORY_NETWORK, "MQTT-TCP %u\t| Created broker connection",
(unsigned)bc->tcpConnectionId);
Expand Down Expand Up @@ -757,10 +766,17 @@ MQTT_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return UA_STATUSCODE_BADCONNECTIONREJECTED;
}

const UA_Boolean *validate = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params, UA_QUALIFIEDNAME(0, "validate"),
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validate && *validate)
return (createBrokerConnection(mcm, params, true) == 0) ?
UA_STATUSCODE_BADCONNECTIONREJECTED : UA_STATUSCODE_GOOD;

/* Test whether an existing broker connection can be reused.
* Otherwise create a new one. */
MQTTBrokerConnection *bc = findIdenticalBrokerConnection(mcm, params);
if(!bc && !(bc = createBrokerConnection(mcm, params)))
if(!bc && !(bc = createBrokerConnection(mcm, params, false)))
return UA_STATUSCODE_BADNOTCONNECTED;

/* Create the per-topic connection */
Expand Down Expand Up @@ -818,6 +834,9 @@ MQTT_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) {
tc->topicConnectionState == UA_CONNECTIONSTATE_CLOSED)
return UA_STATUSCODE_GOOD;

/* TODO: Cancel the ongoing select/epoll if this was called from another
* thread. */

UA_EventLoop *el = tc->brokerConnection->mcm->cm.eventSource.eventLoop;
UA_LOG_DEBUG(el->logger, UA_LOGCATEGORY_NETWORK,
"MQTT %u\t| Shutdown called", (unsigned)tc->topicConnectionId);
Expand Down
16 changes: 14 additions & 2 deletions arch/eventloop_posix_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {

/* Poll the registered sockets */
struct epoll_event epoll_events[64];
int events = epoll_wait(el->epollfd, epoll_events, 64,
int epollfd = el->epollfd;
UA_UNLOCK(&el->elMutex);
int events = epoll_wait(epollfd, epoll_events, 64,
(int)(listenTimeout / UA_DATETIME_MSEC));
/* TODO: Replace with pwait2 for higher-precision timeouts once this is
* available in the standard library.
Expand All @@ -79,8 +81,9 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
* (long)(listenTimeout / UA_DATETIME_SEC),
* (long)((listenTimeout % UA_DATETIME_SEC) * 100)
* };
* int events = epoll_pwait2(el->epollfd, epoll_events, 64,
* int events = epoll_pwait2(epollfd, epoll_events, 64,
* precisionTimeout, NULL); */
UA_LOCK(&el->elMutex);

/* Handle error conditions */
if(events == -1) {
Expand All @@ -100,6 +103,13 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
/* Process all received events */
for(int i = 0; i < events; i++) {
UA_RegisteredFD *rfd = (UA_RegisteredFD*)epoll_events[i].data.ptr;

/* The rfd is already registered for removal. Don't process incoming
* events any longer. */
if(rfd->dc.callback)
continue;

/* Get the event */
short revent = 0;
if((epoll_events[i].events & EPOLLIN) == EPOLLIN) {
revent = UA_FDEVENT_IN;
Expand All @@ -108,6 +118,8 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
} else {
revent = UA_FDEVENT_ERR;
}

/* Call the EventSource callback */
rfd->eventSourceCB(rfd->es, rfd, revent);
}
return UA_STATUSCODE_GOOD;
Expand Down
38 changes: 28 additions & 10 deletions arch/eventloop_posix_eth.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static UA_KeyValueRestriction ethManagerParams[ETH_MANAGERPARAMS] = {
{{0, UA_STRING_STATIC("send-bufsize")}, &UA_TYPES[UA_TYPES_UINT32], false, true, false}
};

#define ETH_PARAMETERSSIZE 14
#define ETH_PARAMETERSSIZE 15
#define ETH_PARAMINDEX_ADDR 0
#define ETH_PARAMINDEX_LISTEN 1
#define ETH_PARAMINDEX_IFACE 2
Expand All @@ -42,6 +42,7 @@ static UA_KeyValueRestriction ethManagerParams[ETH_MANAGERPARAMS] = {
#define ETH_PARAMINDEX_TXTIME 11
#define ETH_PARAMINDEX_TXTIME_PICO 12
#define ETH_PARAMINDEX_TXTIME_DROP 13
#define ETH_PARAMINDEX_VALIDATE 14

static UA_KeyValueRestriction ethConnectionParams[ETH_PARAMETERSSIZE+1] = {
{{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], false, true, false},
Expand All @@ -58,6 +59,7 @@ static UA_KeyValueRestriction ethConnectionParams[ETH_PARAMETERSSIZE+1] = {
{{0, UA_STRING_STATIC("txtime")}, &UA_TYPES[UA_TYPES_DATETIME], false, true, false},
{{0, UA_STRING_STATIC("txtime-pico")}, &UA_TYPES[UA_TYPES_UINT16], false, true, false},
{{0, UA_STRING_STATIC("txtime-drop-late")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
{{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
/* Duplicated address parameter with a scalar value required. For the send-socket case. */
{{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], true, true, false},
};
Expand Down Expand Up @@ -429,7 +431,8 @@ ETH_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *rfd,
static UA_StatusCode
ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
const UA_KeyValueMap *params,
int ifindex, UA_UInt16 etherType) {
int ifindex, UA_UInt16 etherType,
UA_Boolean validate) {
UA_LOCK_ASSERT(&el->elMutex, 1);

/* Bind the socket to interface and EtherType. Don't receive anything else. */
Expand All @@ -438,7 +441,7 @@ ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
sll.sll_family = AF_PACKET;
sll.sll_protocol = htons(etherType);
sll.sll_ifindex = ifindex;
if(bind(conn->rfd.fd, (struct sockaddr*)&sll, sizeof(sll)) < 0)
if(!validate && bind(conn->rfd.fd, (struct sockaddr*)&sll, sizeof(sll)) < 0)
return UA_STATUSCODE_BADINTERNALERROR;

/* Immediately register for listen events. Don't have to wait for a
Expand Down Expand Up @@ -489,19 +492,20 @@ ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
}

struct packet_mreq mreq;
memset(&mreq, 0, sizeof(struct packet_mreq));
mreq.mr_ifindex = ifindex;
mreq.mr_type = PACKET_MR_MULTICAST;
mreq.mr_alen = ETH_ALEN;
memcpy(mreq.mr_address, addr, ETHER_ADDR_LEN);
int ret = UA_setsockopt(conn->rfd.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
(char *)&mreq, sizeof(mreq));
if(ret < 0) {
if(!validate && UA_setsockopt(conn->rfd.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
(char *)&mreq, sizeof(mreq)) < 0) {
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH\t| Registering for multicast failed with error %s", errno_str));
"ETH\t| Registering for multicast failed with error %s",
errno_str));
return UA_STATUSCODE_BADINTERNALERROR;
}
}
}

UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Opened an Ethernet listen socket",
Expand Down Expand Up @@ -643,6 +647,15 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return res;
}

/* Only validate the parameters? */
UA_Boolean validate = false;
const UA_Boolean *validateParam = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params,
ethConnectionParams[ETH_PARAMINDEX_VALIDATE].name,
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validateParam)
validate = *validateParam;

/* Get the EtherType parameter */
UA_UInt16 etherType = ETH_P_ALL;
const UA_UInt16 *etParam = (const UA_UInt16*)
Expand Down Expand Up @@ -724,9 +737,11 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
(unsigned char*)ifr.ifr_hwaddr.sa_data,
ifindex, etherType);
} else {
res = ETH_openListenConnection(el, conn, params, ifindex, etherType);
res = ETH_openListenConnection(el, conn, params, ifindex, etherType, validate);
}
if(res != UA_STATUSCODE_GOOD)

/* Don't actually open or shut down */
if(validate || res != UA_STATUSCODE_GOOD)
goto cleanup;

/* Register in the EventLoop */
Expand Down Expand Up @@ -765,6 +780,9 @@ ETH_shutdown(UA_POSIXConnectionManager *pcm, ETH_FD *conn) {
return;
}

/* Shutdown the socket to cancel the current select/epoll */
shutdown(conn->rfd.fd, UA_SHUT_RDWR);

UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Shutdown called", (unsigned)conn->rfd.fd);

Expand Down
22 changes: 15 additions & 7 deletions arch/eventloop_posix_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
#endif
};

UA_UNLOCK(&el->elMutex);
int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv);
UA_LOCK(&el->elMutex);
if(selectStatus < 0) {
/* We will retry, only log the error */
UA_LOG_SOCKET_ERRNO_WRAP(
Expand All @@ -133,26 +135,32 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
}

/* Loop over all registered FD to see if an event arrived. Yes, this is why
* select is slow for many open sockets. */
* select is slow for many open sockets. */
for(size_t i = 0; i < el->fdsSize; i++) {
UA_RegisteredFD *rfd = el->fds[i];
UA_FD fd = rfd->fd;

/* Error Event */
/* The rfd is already registered for removal. Don't process incoming
* events any longer. */
if(rfd->dc.callback)
continue;

/* Event signaled for the fd? */
short event = 0;
if(FD_ISSET(fd, &readset)) {
if(FD_ISSET(rfd->fd, &readset)) {
event = UA_FDEVENT_IN;
} else if(FD_ISSET(fd, &writeset)) {
} else if(FD_ISSET(rfd->fd, &writeset)) {
event = UA_FDEVENT_OUT;
} else if(FD_ISSET(fd, &errset)) {
} else if(FD_ISSET(rfd->fd, &errset)) {
event = UA_FDEVENT_ERR;
} else {
continue;
}

UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
"Processing event %u on fd %u", (unsigned)event, (unsigned)fd);
"Processing event %u on fd %u", (unsigned)event,
(unsigned)rfd->fd);

/* Call the EventSource callback */
rfd->eventSourceCB(rfd->es, rfd, event);

/* The fd has removed itself */
Expand Down
Loading

0 comments on commit 744ea7e

Please sign in to comment.