From 497b6a7b658082df4d4438e072389e4d347bb97c Mon Sep 17 00:00:00 2001 From: Bryan Biedenkapp Date: Sat, 2 Nov 2024 19:50:17 -0400 Subject: [PATCH] allow FNE PUT /dmr/rid and /p25/rid to target *all* connected peers; correct naming of packet data dumping configuration parameter; continue some work on P25 PDU data and VTUN; adjust P25 PDU ACK_RSP; --- configs/fne-config.example.yml | 2 +- src/fne/HostFNE.cpp | 56 +++- src/fne/HostFNE.h | 6 + src/fne/network/FNENetwork.cpp | 6 +- src/fne/network/FNENetwork.h | 2 +- src/fne/network/RESTAPI.cpp | 26 +- src/fne/network/callhandler/TagDMRData.cpp | 36 ++- src/fne/network/callhandler/TagP25Data.cpp | 35 ++- .../callhandler/packetdata/DMRPacketData.cpp | 4 +- .../callhandler/packetdata/P25PacketData.cpp | 240 ++++++++++++------ .../callhandler/packetdata/P25PacketData.h | 26 +- src/host/p25/packet/Data.cpp | 35 +-- 12 files changed, 329 insertions(+), 145 deletions(-) diff --git a/configs/fne-config.example.yml b/configs/fne-config.example.yml index bc46a707..8264e5fc 100644 --- a/configs/fne-config.example.yml +++ b/configs/fne-config.example.yml @@ -69,7 +69,7 @@ master: # Flag indicating whether packet data will be passed. disablePacketData: false # Flag indicating whether verbose dumping of data packets is enabled. - dumpDataPacket: false + dumpPacketData: false # Delay from when a call on a parrot TG ends to when the playback starts (in milliseconds). parrotDelay: 2000 diff --git a/src/fne/HostFNE.cpp b/src/fne/HostFNE.cpp index 8d370729..16d55d07 100644 --- a/src/fne/HostFNE.cpp +++ b/src/fne/HostFNE.cpp @@ -206,6 +206,8 @@ int HostFNE::run() #if !defined(_WIN32) if (!Thread::runAsThread(this, threadVirtualNetworking)) return EXIT_FAILURE; + if (!Thread::runAsThread(this, threadVirtualNetworkingClock)) + return EXIT_FAILURE; #endif // !defined(_WIN32) /* ** Main execution loop @@ -836,7 +838,7 @@ void* HostFNE::threadVirtualNetworking(void* arg) if (th != nullptr) { ::pthread_detach(th->thread); - std::string threadName("fne:vtun-loop"); + std::string threadName("fne:vtun-net-rx"); HostFNE* fne = static_cast(th->obj); if (fne == nullptr) { g_killed = true; @@ -863,7 +865,6 @@ void* HostFNE::threadVirtualNetworking(void* arg) stopWatch.start(); while (!g_killed) { - uint32_t ms = stopWatch.elapsed(); stopWatch.start(); uint8_t packet[DEFAULT_MTU_SIZE]; @@ -882,6 +883,55 @@ void* HostFNE::threadVirtualNetworking(void* arg) } } + Thread::sleep(2U); + } + } + + LogDebug(LOG_HOST, "[STOP] %s", threadName.c_str()); + delete th; + } + + return nullptr; +} + +/* Entry point to virtual networking clocking thread. */ + +void* HostFNE::threadVirtualNetworkingClock(void* arg) +{ + thread_t* th = (thread_t*)arg; + if (th != nullptr) { + ::pthread_detach(th->thread); + + std::string threadName("fne:vtun-clock"); + HostFNE* fne = static_cast(th->obj); + if (fne == nullptr) { + g_killed = true; + LogDebug(LOG_HOST, "[FAIL] %s", threadName.c_str()); + } + + if (g_killed) { + delete th; + return nullptr; + } + + if (!fne->m_vtunEnabled) { + delete th; + return nullptr; + } + + LogDebug(LOG_HOST, "[ OK ] %s", threadName.c_str()); +#ifdef _GNU_SOURCE + ::pthread_setname_np(th->thread, threadName.c_str()); +#endif // _GNU_SOURCE + + if (fne->m_tun != nullptr) { + StopWatch stopWatch; + stopWatch.start(); + + while (!g_killed) { + uint32_t ms = stopWatch.elapsed(); + stopWatch.start(); + // clock traffic handler switch (fne->m_packetDataMode) { case PacketDataMode::DMR: @@ -893,7 +943,7 @@ void* HostFNE::threadVirtualNetworking(void* arg) break; } - Thread::sleep(5U); + Thread::sleep(2U); } } diff --git a/src/fne/HostFNE.h b/src/fne/HostFNE.h index e86ea372..77e983de 100644 --- a/src/fne/HostFNE.h +++ b/src/fne/HostFNE.h @@ -163,6 +163,12 @@ class HOST_SW_API HostFNE { * @returns void* (Ignore) */ static void* threadVirtualNetworking(void* arg); + /** + * @brief Entry point to virtual networking clocking thread. + * @param arg Instance of the thread_t structure. + * @returns void* (Ignore) + */ + static void* threadVirtualNetworkingClock(void* arg); #endif // !defined(_WIN32) /** * @brief Processes any peer network traffic. diff --git a/src/fne/network/FNENetwork.cpp b/src/fne/network/FNENetwork.cpp index 81f0c44a..84104871 100644 --- a/src/fne/network/FNENetwork.cpp +++ b/src/fne/network/FNENetwork.cpp @@ -94,7 +94,7 @@ FNENetwork::FNENetwork(HostFNE* host, const std::string& address, uint16_t port, m_influxBucket("dvm"), m_influxLogRawData(false), m_disablePacketData(false), - m_dumpDataPacket(false), + m_dumpPacketData(false), m_reportPeerPing(reportPeerPing), m_verbose(verbose) { @@ -153,7 +153,7 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions) m_filterTerminators = conf["filterTerminators"].as(true); m_disablePacketData = conf["disablePacketData"].as(false); - m_dumpDataPacket = conf["dumpDataPacket"].as(false); + m_dumpPacketData = conf["dumpPacketData"].as(false); /* ** Drop Unit to Unit Peers @@ -176,7 +176,7 @@ void FNENetwork::setOptions(yaml::Node& conf, bool printOptions) LogWarning(LOG_NET, "NOTICE: All P25 ADJ_STS_BCAST messages will be blocked and dropped!"); } LogInfo(" Disable Packet Data: %s", m_disablePacketData ? "yes" : "no"); - LogInfo(" Dump Packet Data: %s", m_dumpDataPacket ? "yes" : "no"); + LogInfo(" Dump Packet Data: %s", m_dumpPacketData ? "yes" : "no"); LogInfo(" Disable P25 ADJ_STS_BCAST to external peers: %s", m_disallowExtAdjStsBcast ? "yes" : "no"); LogInfo(" Allow conventional sites to override affiliation and receive all traffic: %s", m_allowConvSiteAffOverride ? "yes" : "no"); LogInfo(" Restrict grant response by affiliation: %s", m_restrictGrantToAffOnly ? "yes" : "no"); diff --git a/src/fne/network/FNENetwork.h b/src/fne/network/FNENetwork.h index 779c8558..a6ea52e6 100644 --- a/src/fne/network/FNENetwork.h +++ b/src/fne/network/FNENetwork.h @@ -475,7 +475,7 @@ namespace network influxdb::ServerInfo m_influxServer; bool m_disablePacketData; - bool m_dumpDataPacket; + bool m_dumpPacketData; bool m_reportPeerPing; bool m_verbose; diff --git a/src/fne/network/RESTAPI.cpp b/src/fne/network/RESTAPI.cpp index 62491805..46e39825 100644 --- a/src/fne/network/RESTAPI.cpp +++ b/src/fne/network/RESTAPI.cpp @@ -1419,12 +1419,6 @@ void RESTAPI::restAPI_PutDMRRID(const HTTPPayload& request, HTTPPayload& reply, return; } - // validate peer ID is a integer within the JSON blob - if (!req["peerId"].is()) { - errorPayload(reply, "peer ID was not valid"); - return; - } - // validate destination ID is a integer within the JSON blob if (!req["dstId"].is()) { errorPayload(reply, "destination ID was not valid"); @@ -1437,15 +1431,10 @@ void RESTAPI::restAPI_PutDMRRID(const HTTPPayload& request, HTTPPayload& reply, return; } - uint32_t peerId = req["peerId"].get(); + uint32_t peerId = req["peerId"].getDefault(0U); uint32_t dstId = req["dstId"].get(); uint8_t slot = req["slot"].get(); - if (peerId == 0U) { - errorPayload(reply, "peer ID was not valid"); - return; - } - if (dstId == 0U) { errorPayload(reply, "destination ID was not valid"); return; @@ -1500,26 +1489,15 @@ void RESTAPI::restAPI_PutP25RID(const HTTPPayload& request, HTTPPayload& reply, return; } - // validate peer ID is a integer within the JSON blob - if (!req["peerId"].is()) { - errorPayload(reply, "peer ID was not valid"); - return; - } - // validate destination ID is a integer within the JSON blob if (!req["dstId"].is()) { errorPayload(reply, "destination ID was not valid"); return; } - uint32_t peerId = req["peerId"].get(); + uint32_t peerId = req["peerId"].getDefault(0U); uint32_t dstId = req["dstId"].get(); - if (peerId == 0U) { - errorPayload(reply, "peer ID was not valid"); - return; - } - if (dstId == 0U) { errorPayload(reply, "destination ID was not valid"); return; diff --git a/src/fne/network/callhandler/TagDMRData.cpp b/src/fne/network/callhandler/TagDMRData.cpp index 315a1170..137da596 100644 --- a/src/fne/network/callhandler/TagDMRData.cpp +++ b/src/fne/network/callhandler/TagDMRData.cpp @@ -952,5 +952,39 @@ void TagDMRData::write_CSBK(uint32_t peerId, uint8_t slot, lc::CSBK* csbk) return; } - m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + if (peerId > 0U) { + m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + } else { + // repeat traffic to the connected peers + if (m_network->m_peers.size() > 0U) { + uint32_t i = 0U; + for (auto peer : m_network->m_peers) { + // every 5 peers flush the queue + if (i % 5U == 0U) { + m_network->m_frameQueue->flushQueue(); + } + + m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, true); + if (m_network->m_debug) { + LogDebug(LOG_NET, "DMR, peer = %u, slotNo = %u, len = %u, stream = %u", + peer.first, slot, messageLength, streamId); + } + i++; + } + m_network->m_frameQueue->flushQueue(); + } + + // repeat traffic to external peers + if (m_network->m_host->m_peerNetworks.size() > 0U) { + for (auto peer : m_network->m_host->m_peerNetworks) { + uint32_t dstPeerId = peer.second->getPeerId(); + + peer.second->writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_DMR }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId); + if (m_network->m_debug) { + LogDebug(LOG_NET, "DMR, peer = %u, slotNo = %u, len = %u, stream = %u", + dstPeerId, slot, messageLength, streamId); + } + } + } + } } diff --git a/src/fne/network/callhandler/TagP25Data.cpp b/src/fne/network/callhandler/TagP25Data.cpp index 4db93cbd..6e22416e 100644 --- a/src/fne/network/callhandler/TagP25Data.cpp +++ b/src/fne/network/callhandler/TagP25Data.cpp @@ -1283,5 +1283,38 @@ void TagP25Data::write_TSDU(uint32_t peerId, lc::TSBK* tsbk) } uint32_t streamId = m_network->createStreamId(); - m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + if (peerId > 0U) { + m_network->writePeer(peerId, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, false, true); + } else { + // repeat traffic to the connected peers + if (m_network->m_peers.size() > 0U) { + uint32_t i = 0U; + for (auto peer : m_network->m_peers) { + // every 5 peers flush the queue + if (i % 5U == 0U) { + m_network->m_frameQueue->flushQueue(); + } + + m_network->writePeer(peer.first, { NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId, true); + if (m_network->m_debug) { + LogDebug(LOG_NET, "P25, peer = %u, len = %u, streamId = %u", + peer.first, messageLength, streamId); + } + i++; + } + m_network->m_frameQueue->flushQueue(); + } + + // repeat traffic to external peers + if (m_network->m_host->m_peerNetworks.size() > 0U) { + for (auto peer : m_network->m_host->m_peerNetworks) { + uint32_t dstPeerId = peer.second->getPeerId(); + peer.second->writeMaster({ NET_FUNC::PROTOCOL, NET_SUBFUNC::PROTOCOL_SUBFUNC_P25 }, message.get(), messageLength, RTP_END_OF_CALL_SEQ, streamId); + if (m_network->m_debug) { + LogDebug(LOG_NET, "P25, peer = %u, len = %u, streamId = %u", + dstPeerId, messageLength, streamId); + } + } + } + } } diff --git a/src/fne/network/callhandler/packetdata/DMRPacketData.cpp b/src/fne/network/callhandler/packetdata/DMRPacketData.cpp index e94c62be..528eca05 100644 --- a/src/fne/network/callhandler/packetdata/DMRPacketData.cpp +++ b/src/fne/network/callhandler/packetdata/DMRPacketData.cpp @@ -253,8 +253,8 @@ void DMRPacketData::dispatch(uint32_t peerId, dmr::data::NetData& dmrData, const LogWarning(LOG_NET, P25_PDU_STR ", failed CRC-32 check, blocks %u, len %u", status->header.getBlocksToFollow(), status->pduDataOffset); } - if (m_network->m_dumpDataPacket) { - Utils::dump(1U, "PDU Packet", status->pduUserData, status->pduDataOffset); + if (m_network->m_dumpPacketData) { + Utils::dump(1U, "ISP PDU Packet", status->pduUserData, status->pduDataOffset); } } } diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.cpp b/src/fne/network/callhandler/packetdata/P25PacketData.cpp index 4f7e75b6..a0671936 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.cpp +++ b/src/fne/network/callhandler/packetdata/P25PacketData.cpp @@ -40,6 +40,12 @@ using namespace p25::sndcp; const uint8_t DATA_CALL_COLL_TIMEOUT = 60U; +// --------------------------------------------------------------------------- +// Static Class Members +// --------------------------------------------------------------------------- + +std::timed_mutex P25PacketData::m_vtunMutex; + // --------------------------------------------------------------------------- // Public Class Members // --------------------------------------------------------------------------- @@ -52,8 +58,8 @@ P25PacketData::P25PacketData(FNENetwork* network, TagP25Data* tag, bool debug) : m_dataFrames(), m_status(), m_arpTable(), - m_readyForPkt(), - m_suNotReadyTimeout(), + m_readyForNextPkt(), + m_suSendSeq(), m_debug(debug) { assert(network != nullptr); @@ -129,6 +135,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee status->llId = status->header.getLLId(); m_status[peerId] = status; + m_readyForNextPkt[status->llId] = true; // is this a response header? if (status->header.getFormat() == PDUFormatType::RSP) { @@ -141,7 +148,6 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee if (status->header.getSAP() != PDUSAP::EXT_ADDR && status->header.getFormat() != PDUFormatType::UNCONFIRMED) { - m_readyForPkt[status->llId] = true; m_suSendSeq[status->llId] = 0U; } @@ -218,7 +224,6 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee status->extendedAddress = true; status->llId = status->header.getSrcLLId(); - m_readyForPkt[status->llId] = true; m_suSendSeq[status->llId] = 0U; offset += P25_PDU_FEC_LENGTH_BYTES; @@ -277,7 +282,7 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee else LogWarning(LOG_NET, P25_PDU_STR ", unfixable PDU data (1/2 rate or CRC), block %u", i); - if (m_network->m_dumpDataPacket) { + if (m_network->m_dumpPacketData) { Utils::dump(1U, "Unfixable PDU Data", buffer, P25_PDU_FEC_LENGTH_BYTES); } } @@ -325,6 +330,8 @@ bool P25PacketData::processFrame(const uint8_t* data, uint32_t len, uint32_t pee void P25PacketData::processPacketFrame(const uint8_t* data, uint32_t len, bool alreadyQueued) { + uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + #if !defined(_WIN32) struct ip* ipHeader = (struct ip*)data; @@ -337,30 +344,34 @@ void P25PacketData::processPacketFrame(const uint8_t* data, uint32_t len, bool a uint8_t proto = ipHeader->ip_p; uint16_t pktLen = Utils::reverseEndian(ipHeader->ip_len); // bryanb: this could be problematic on different endianness - LogMessage(LOG_NET, "P25, VTUN -> PDU IP Data, srcIp = %s, dstIp = %s, pktLen = %u, proto = %02X", srcIp, dstIp, pktLen, proto); #if DEBUG_P25_PDU_DATA Utils::dump(1U, "P25PacketData::processPacketFrame() packet", data, pktLen); #endif - VTUNDataFrame dataFrame; - dataFrame.buffer = new uint8_t[len]; - ::memcpy(dataFrame.buffer, data, len); - dataFrame.bufferLen = len; - dataFrame.pktLen = pktLen; + VTUNDataFrame* dataFrame = new VTUNDataFrame(); + dataFrame->buffer = new uint8_t[len]; + ::memcpy(dataFrame->buffer, data, len); + dataFrame->bufferLen = len; + dataFrame->pktLen = pktLen; + dataFrame->proto = proto; uint32_t dstLlId = getLLIdAddress(Utils::reverseEndian(ipHeader->ip_dst.s_addr)); - dataFrame.srcHWAddr = WUID_FNE; - dataFrame.srcProtoAddr = Utils::reverseEndian(ipHeader->ip_src.s_addr); - dataFrame.tgtHWAddr = dstLlId; - dataFrame.tgtProtoAddr = Utils::reverseEndian(ipHeader->ip_dst.s_addr); + dataFrame->srcHWAddr = WUID_FNE; + dataFrame->srcProtoAddr = Utils::reverseEndian(ipHeader->ip_src.s_addr); + dataFrame->tgtHWAddr = dstLlId; + dataFrame->tgtProtoAddr = Utils::reverseEndian(ipHeader->ip_dst.s_addr); + + dataFrame->timestamp = now; if (dstLlId == 0U) { LogMessage(LOG_NET, "P25, no ARP entry for, dstIp = %s", dstIp); write_PDU_ARP(Utils::reverseEndian(ipHeader->ip_dst.s_addr)); } + m_vtunMutex.try_lock_for(std::chrono::milliseconds(60)); m_dataFrames.push_back(dataFrame); + m_vtunMutex.unlock(); #endif // !defined(_WIN32) } @@ -368,61 +379,92 @@ void P25PacketData::processPacketFrame(const uint8_t* data, uint32_t len, bool a void P25PacketData::clock(uint32_t ms) { - // transmit queued data frames - if (m_dataFrames.size() > 0) { - auto& dataFrame = m_dataFrames[0]; + uint64_t now = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - if (dataFrame.tgtHWAddr == 0U) { - uint32_t dstLlId = getLLIdAddress(dataFrame.tgtProtoAddr); - if (dstLlId == 0U) - return; + if (m_dataFrames.size() == 0U) { + return; + } - dataFrame.tgtHWAddr = dstLlId; - } + // transmit queued data frames + bool processed = false; + m_vtunMutex.try_lock_for(std::chrono::milliseconds(60)); + auto& dataFrame = m_dataFrames[0]; + m_vtunMutex.unlock(); + + if (dataFrame != nullptr) { + if (now > dataFrame->timestamp + 500U) { + processed = true; + + // do we have a valid target address? + if (dataFrame->tgtHWAddr == 0U) { + uint32_t dstLlId = getLLIdAddress(dataFrame->tgtProtoAddr); + if (dstLlId == 0U) { + processed = false; + goto pkt_clock_abort; + } - // don't allow another packet to go out if we haven't acked the previous - if (!m_readyForPkt[dataFrame.tgtHWAddr]) { - m_suNotReadyTimeout[dataFrame.tgtHWAddr].clock(ms); - if (m_suNotReadyTimeout[dataFrame.tgtHWAddr].isRunning() && m_suNotReadyTimeout[dataFrame.tgtHWAddr].hasExpired()) { - m_suNotReadyTimeout[dataFrame.tgtHWAddr].stop(); - m_readyForPkt[dataFrame.tgtHWAddr] = true; + dataFrame->tgtHWAddr = dstLlId; } - return; - } + // is the SU ready for the next packet? + auto ready = std::find_if(m_readyForNextPkt.begin(), m_readyForNextPkt.end(), [=](ReadyForNextPktPair x) { return x.first == dataFrame->tgtHWAddr; }); + if (ready != m_readyForNextPkt.end()) { + if (!ready->second) { + processed = false; + goto pkt_clock_abort; + } + } else { + processed = false; + goto pkt_clock_abort; + } + + m_readyForNextPkt[dataFrame->tgtHWAddr] = false; + + std::string srcIp = __IP_FROM_UINT(dataFrame->srcProtoAddr); + std::string tgtIp = __IP_FROM_UINT(dataFrame->tgtProtoAddr); + + LogMessage(LOG_NET, "P25, VTUN -> PDU IP Data, srcIp = %s (%u), dstIp = %s (%u), pktLen = %u, proto = %02X", + srcIp.c_str(), dataFrame->srcHWAddr, tgtIp.c_str(), dataFrame->tgtHWAddr, dataFrame->pktLen, dataFrame->proto); + + // assemble a P25 PDU frame header for transport... + data::DataHeader rspHeader = data::DataHeader(); + rspHeader.setFormat(PDUFormatType::CONFIRMED); + rspHeader.setMFId(MFG_STANDARD); + rspHeader.setAckNeeded(true); + rspHeader.setOutbound(true); + rspHeader.setSAP(PDUSAP::EXT_ADDR); + rspHeader.setLLId(dataFrame->tgtHWAddr); + rspHeader.setBlocksToFollow(1U); - m_readyForPkt[dataFrame.tgtHWAddr] = false; - m_suNotReadyTimeout[dataFrame.tgtHWAddr] = Timer(1000U, 5U, 0U); - m_suNotReadyTimeout[dataFrame.tgtHWAddr].start(); - - // assemble a P25 PDU frame header for transport... - data::DataHeader rspHeader = data::DataHeader(); - rspHeader.setFormat(PDUFormatType::CONFIRMED); - rspHeader.setMFId(MFG_STANDARD); - rspHeader.setAckNeeded(true); - rspHeader.setOutbound(true); - rspHeader.setSAP(PDUSAP::EXT_ADDR); - rspHeader.setLLId(dataFrame.tgtHWAddr); - rspHeader.setBlocksToFollow(1U); - - rspHeader.setEXSAP(PDUSAP::PACKET_DATA); - rspHeader.setSrcLLId(WUID_FNE); - - rspHeader.calculateLength(dataFrame.pktLen); - uint32_t pduLength = rspHeader.getPDULength(); - - UInt8Array __pduUserData = std::make_unique(pduLength); - uint8_t* pduUserData = __pduUserData.get(); - ::memset(pduUserData, 0x00U, pduLength); - ::memcpy(pduUserData + 4U, dataFrame.buffer, dataFrame.pktLen); + rspHeader.setEXSAP(PDUSAP::PACKET_DATA); + rspHeader.setSrcLLId(WUID_FNE); + + rspHeader.calculateLength(dataFrame->pktLen); + uint32_t pduLength = rspHeader.getPDULength(); + + UInt8Array __pduUserData = std::make_unique(pduLength); + uint8_t* pduUserData = __pduUserData.get(); + ::memset(pduUserData, 0x00U, pduLength); + ::memcpy(pduUserData + 4U, dataFrame->buffer, dataFrame->pktLen); #if DEBUG_P25_PDU_DATA - Utils::dump(1U, "P25PacketData::clock() pduUserData", pduUserData, pduLength); + Utils::dump(1U, "P25PacketData::clock() pduUserData", pduUserData, pduLength); #endif - dispatchUserFrameToFNE(rspHeader, true, pduUserData); + dispatchUserFrameToFNE(rspHeader, true, pduUserData); + } + } - delete[] dataFrame.buffer; - m_dataFrames.pop_front(); +pkt_clock_abort: + m_vtunMutex.try_lock_for(std::chrono::milliseconds(60)); + m_dataFrames.pop_front(); + if (processed) { + if (dataFrame->buffer != nullptr) + delete[] dataFrame->buffer; + delete dataFrame; + } else { + // requeue packet + m_dataFrames.push_back(dataFrame); } + m_vtunMutex.unlock(); } // --------------------------------------------------------------------------- @@ -449,19 +491,21 @@ void P25PacketData::dispatch(uint32_t peerId) } } - if (m_network->m_dumpDataPacket && status->dataBlockCnt > 0U) { - Utils::dump(1U, "PDU Packet", status->pduUserData, status->pduUserDataLength); + if (m_network->m_dumpPacketData && status->dataBlockCnt > 0U) { + Utils::dump(1U, "ISP PDU Packet", status->pduUserData, status->pduUserDataLength); } if (status->header.getFormat() == PDUFormatType::RSP) { LogMessage(LOG_NET, P25_PDU_STR ", ISP, response, fmt = $%02X, rspClass = $%02X, rspType = $%02X, rspStatus = $%02X, llId = %u, srcLlId = %u", status->header.getFormat(), status->header.getResponseClass(), status->header.getResponseType(), status->header.getResponseStatus(), status->header.getLLId(), status->header.getSrcLLId()); -/* + if (status->header.getResponseClass() == PDUAckClass::ACK && status->header.getResponseType() == PDUAckType::ACK) { - m_readyForPkt[status->header.getSrcLLId()] = true; + m_readyForNextPkt[status->header.getSrcLLId()] = true; } -*/ + + write_PDU_Ack_Response(status->header.getResponseClass(), status->header.getResponseType(), status->header.getResponseStatus(), + status->header.getLLId(), status->header.getSrcLLId()); return; } @@ -507,9 +551,17 @@ void P25PacketData::dispatch(uint32_t peerId) LogWarning(LOG_NET, P25_PDU_STR ", ARP reply, %u is trying to masquerade as us...", srcHWAddr); } else { m_arpTable[srcHWAddr] = srcProtoAddr; - } - m_readyForPkt[srcHWAddr] = true; + // is the SU ready for the next packet? + auto ready = std::find_if(m_readyForNextPkt.begin(), m_readyForNextPkt.end(), [=](ReadyForNextPktPair x) { return x.first == srcHWAddr; }); + if (ready != m_readyForNextPkt.end()) { + if (!ready->second) { + m_readyForNextPkt[srcHWAddr] = true; + } + } else { + m_readyForNextPkt[srcHWAddr] = true; + } + } } #else break; @@ -540,7 +592,45 @@ void P25PacketData::dispatch(uint32_t peerId) uint8_t proto = ipHeader->ip_p; uint16_t pktLen = Utils::reverseEndian(ipHeader->ip_len); // bryanb: this could be problematic on different endianness - LogMessage(LOG_NET, "P25, PDU -> VTUN, IP Data, srcIp = %s, dstIp = %s, pktLen = %u, proto = %02X", srcIp, dstIp, pktLen, proto); + // reflect broadcast messages back to the CAI network + bool handled = false; + if (status->header.getLLId() == WUID_ALL) { + LogMessage(LOG_NET, "P25, PDU -> VTUN, IP Data, repeated to CAI, broadcast packet, dstIp = %s (%u)", + dstIp, status->header.getLLId()); + + dispatchUserFrameToFNE(status->header, status->extendedAddress, status->pduUserData); + handled = true; + + // is the source SU one we have proper ARP entries for? + auto arpEntry = std::find_if(m_arpTable.begin(), m_arpTable.end(), [=](ArpTablePair x) { return x.first == status->header.getSrcLLId(); }); + if (arpEntry == m_arpTable.end()) { + uint32_t srcProtoAddr = Utils::reverseEndian(ipHeader->ip_src.s_addr); + LogMessage(LOG_NET, P25_PDU_STR ", adding ARP entry, %s is at %u", __IP_FROM_UINT(srcProtoAddr).c_str(), status->header.getSrcLLId()); + m_arpTable[status->header.getSrcLLId()] = Utils::reverseEndian(ipHeader->ip_src.s_addr); + } + } + + // is the target SU one we have proper ARP entries for? + auto arpEntry = std::find_if(m_arpTable.begin(), m_arpTable.end(), [=](ArpTablePair x) { return x.first == status->header.getLLId(); }); + if (arpEntry != m_arpTable.end()) { + LogMessage(LOG_NET, "P25, PDU -> VTUN, IP Data, repeated to CAI, destination IP has a CAI ARP table entry, dstIp = %s (%u)", + dstIp, status->header.getLLId()); + + dispatchUserFrameToFNE(status->header, status->extendedAddress, status->pduUserData); + handled = true; + + // is the source SU one we have proper ARP entries for? + auto arpEntry = std::find_if(m_arpTable.begin(), m_arpTable.end(), [=](ArpTablePair x) { return x.first == status->header.getSrcLLId(); }); + if (arpEntry == m_arpTable.end()) { + uint32_t srcProtoAddr = Utils::reverseEndian(ipHeader->ip_src.s_addr); + LogMessage(LOG_NET, P25_PDU_STR ", adding ARP entry, %s is at %u", __IP_FROM_UINT(srcProtoAddr).c_str(), status->header.getSrcLLId()); + m_arpTable[status->header.getSrcLLId()] = Utils::reverseEndian(ipHeader->ip_src.s_addr); + } + } + + // transmit packet to IP network + LogMessage(LOG_NET, "P25, PDU -> VTUN, IP Data, srcIp = %s (%u), dstIp = %s (%u), pktLen = %u, proto = %02X", + srcIp, status->header.getSrcLLId(), dstIp, status->header.getLLId(), pktLen, proto); UInt8Array __ipFrame = std::make_unique(pktLen); uint8_t* ipFrame = __ipFrame.get(); @@ -553,8 +643,10 @@ void P25PacketData::dispatch(uint32_t peerId) LogError(LOG_NET, P25_PDU_STR ", failed to write IP frame to virtual tunnel, len %u", pktLen); } - write_PDU_Ack_Response(PDUAckClass::ACK, PDUAckType::ACK, status->header.getNs(), (status->extendedAddress) ? status->header.getSrcLLId() : status->header.getLLId()); - m_readyForPkt[status->header.getSrcLLId()] = true; + // if the packet is unhandled and sent off to VTUN; ack the packet so the sender knows we received it + if (!handled) { + write_PDU_Ack_Response(PDUAckClass::ACK, PDUAckType::ACK, status->header.getNs(), status->header.getSrcLLId(), status->header.getLLId()); + } #endif // !defined(_WIN32) } break; @@ -849,11 +941,9 @@ void P25PacketData::write_PDU_Ack_Response(uint8_t ackClass, uint8_t ackType, ui rspHeader.setLLId(llId); if (srcLlId > 0U) { rspHeader.setSrcLLId(srcLlId); - rspHeader.setFullMessage(false); - } - else { - rspHeader.setFullMessage(true); } + rspHeader.setFullMessage(true); + rspHeader.setBlocksToFollow(0U); dispatchUserFrameToFNE(rspHeader, srcLlId > 0U, nullptr); @@ -921,7 +1011,7 @@ void P25PacketData::write_PDU_User(uint32_t peerId, network::PeerNetwork* peerNe edac::CRC::addCRC32(pduUserData, packetLength); } - if (m_network->m_dumpDataPacket) { + if (m_network->m_dumpPacketData) { Utils::dump("OSP PDU User Data", pduUserData, packetLength); } diff --git a/src/fne/network/callhandler/packetdata/P25PacketData.h b/src/fne/network/callhandler/packetdata/P25PacketData.h index 6a388e35..b5187c7e 100644 --- a/src/fne/network/callhandler/packetdata/P25PacketData.h +++ b/src/fne/network/callhandler/packetdata/P25PacketData.h @@ -90,17 +90,20 @@ namespace network */ class VTUNDataFrame { public: - uint32_t srcHWAddr; - uint32_t srcProtoAddr; - uint32_t tgtHWAddr; - uint32_t tgtProtoAddr; + uint32_t srcHWAddr; //! Source Hardware Address + uint32_t srcProtoAddr; //! Source Protocol Address + uint32_t tgtHWAddr; //! Target Hardware Address + uint32_t tgtProtoAddr; //! Target Protocol Address - uint8_t* buffer; - uint32_t bufferLen; + uint8_t* buffer; //! Raw data buffer + uint32_t bufferLen; //! Length of raw data buffer - uint16_t pktLen; + uint16_t pktLen; //! Packet Length + uint8_t proto; //! Packet Protocol + + uint64_t timestamp; //! Timestamp in milliseconds }; - std::deque m_dataFrames; + std::deque m_dataFrames; /** * @brief Represents the receive status of a call. @@ -161,13 +164,16 @@ namespace network typedef std::pair StatusMapPair; std::unordered_map m_status; + typedef std::pair ArpTablePair; std::unordered_map m_arpTable; - std::unordered_map m_readyForPkt; - std::unordered_map m_suNotReadyTimeout; + typedef std::pair ReadyForNextPktPair; + std::unordered_map m_readyForNextPkt; std::unordered_map m_suSendSeq; bool m_debug; + static std::timed_mutex m_vtunMutex; + /** * @brief Helper to dispatch PDU user data. * @param peerId Peer ID. diff --git a/src/host/p25/packet/Data.cpp b/src/host/p25/packet/Data.cpp index bea3c1f1..de8c6e00 100644 --- a/src/host/p25/packet/Data.cpp +++ b/src/host/p25/packet/Data.cpp @@ -381,7 +381,7 @@ bool Data::process(uint8_t* data, uint32_t len) LogMessage(LOG_RF, P25_PDU_STR ", ISP, response, OSP ACK RETRY, llId = %u, exceeded retries, undeliverable", m_rfDataHeader.getLLId()); - writeRF_PDU_Ack_Response(PDUAckClass::NACK, PDUAckType::NACK_UNDELIVERABLE, m_rfDataHeader.getNs(), m_rfDataHeader.getLLId()); + writeRF_PDU_Ack_Response(PDUAckClass::NACK, PDUAckType::NACK_UNDELIVERABLE, m_rfDataHeader.getNs(), m_rfDataHeader.getLLId(), m_rfDataHeader.getSrcLLId()); } } } @@ -390,11 +390,8 @@ bool Data::process(uint8_t* data, uint32_t len) // only repeat the PDU locally if the packet isn't for the FNE if (m_repeatPDU && m_rfDataHeader.getLLId() != WUID_FNE) { - if (m_verbose) { - LogMessage(LOG_RF, P25_PDU_STR ", repeating ACK PDU, llId = %u, srcLlId = %u", m_rfDataHeader.getLLId(), m_rfDataHeader.getSrcLLId()); - } - - writeRF_PDU_Buffered(); // re-generate buffered PDU and send it on + writeRF_PDU_Ack_Response(m_rfDataHeader.getResponseClass(), m_rfDataHeader.getResponseType(), m_rfDataHeader.getResponseStatus(), + m_rfDataHeader.getLLId(), m_rfDataHeader.getSrcLLId()); } } else { @@ -497,9 +494,6 @@ bool Data::process(uint8_t* data, uint32_t len) bool Data::processNetwork(uint8_t* data, uint32_t len, uint32_t blockLength) { - if (m_p25->m_rfState != RS_RF_LISTENING && m_p25->m_netState == RS_NET_IDLE) - return false; - if (m_p25->m_netState != RS_NET_DATA) { m_netDataHeader.reset(); m_netDataOffset = 0U; @@ -601,13 +595,8 @@ bool Data::processNetwork(uint8_t* data, uint32_t len, uint32_t blockLength) } } - if (m_repeatPDU) { - if (m_verbose) { - LogMessage(LOG_NET, P25_PDU_STR ", repeating ACK PDU, llId = %u, srcLlId = %u", m_netDataHeader.getLLId(), m_netDataHeader.getSrcLLId()); - } - - writeNet_PDU_Buffered(); // re-generate buffered PDU and send it on - } + writeRF_PDU_Ack_Response(m_netDataHeader.getResponseClass(), m_netDataHeader.getResponseType(), m_netDataHeader.getResponseStatus(), + m_netDataHeader.getLLId(), m_netDataHeader.getSrcLLId()); m_netDataHeader.reset(); m_netExtendedAddress = false; @@ -1439,7 +1428,7 @@ void Data::writeRF_PDU(const uint8_t* pdu, uint32_t bitLength, bool noNulls, boo // Add status bits P25Utils::addStatusBits(data + 2U, newBitLength, false); - P25Utils::addIdleStatusBits(data + 2U, newBitLength); + P25Utils::addTrunkSlotStatusBits(data + 2U, newBitLength); // Set first busy bits to 1,1 P25Utils::setStatusBits(data + 2U, P25_SS0_START, true, true); @@ -1697,13 +1686,11 @@ void Data::writeRF_PDU_Ack_Response(uint8_t ackClass, uint8_t ackType, uint8_t a rspHeader.setResponseType(ackType); rspHeader.setResponseStatus(ackStatus); rspHeader.setLLId(llId); - if (m_rfDataHeader.getSAP() == PDUSAP::EXT_ADDR) { + if (srcLlId > 0U) { rspHeader.setSrcLLId(srcLlId); - rspHeader.setFullMessage(false); - } - else { - rspHeader.setFullMessage(true); } + rspHeader.setFullMessage(true); + rspHeader.setBlocksToFollow(0U); // Generate the PDU header and 1/2 rate Trellis @@ -1711,8 +1698,8 @@ void Data::writeRF_PDU_Ack_Response(uint8_t ackClass, uint8_t ackType, uint8_t a Utils::setBitRange(block, data, offset, P25_PDU_FEC_LENGTH_BITS); if (m_verbose) { - LogMessage(LOG_RF, P25_PDU_STR ", OSP, response, ackClass = $%02X, ackType = $%02X, llId = %u, srcLLId = %u", - rspHeader.getResponseClass(), rspHeader.getResponseType(), rspHeader.getLLId(), rspHeader.getSrcLLId()); + LogMessage(LOG_RF, P25_PDU_STR ", OSP, response, rspClass = $%02X, rspType = $%02X, rspStatus = $%02X, llId = %u, srcLLId = %u", + rspHeader.getResponseClass(), rspHeader.getResponseType(), rspHeader.getResponseStatus(), rspHeader.getLLId(), rspHeader.getSrcLLId()); } writeRF_PDU(data, bitLength, noNulls);