Skip to content

Commit

Permalink
Merge remote-tracking branch 'bertm/issue-07217' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneBab committed Nov 30, 2024
2 parents 8c091c6 + 1ef201f commit 67e3ff3
Showing 1 changed file with 99 additions and 108 deletions.
207 changes: 99 additions & 108 deletions src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package freenet.io.comm;

import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.DatagramSocketImpl;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.Random;

Expand All @@ -26,11 +24,13 @@
import freenet.support.Logger;
import freenet.support.io.NativeThread;
import freenet.support.transport.ip.IPUtil;
import sun.misc.Unsafe;

public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, PortForwardSensitiveSocketHandler {

private final DatagramSocket _sock;
private final InetAddress _bindTo;
private final ByteBuffer receiveBuffer = ByteBuffer.allocate(MAX_RECEIVE_SIZE);
private final DatagramChannel datagramChannel;
private final InetSocketAddress localAddress;
private final AddressTracker tracker;
private IncomingPacketFilter lowLevelFilter;
/** RNG for debugging, used with _dropProbability.
Expand All @@ -45,7 +45,6 @@ public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, Port
private static volatile boolean logDEBUG;
private boolean _isDone;
private volatile boolean _active = true;
private final int listenPort;
private final String title;
private boolean _started;
private long startTime;
Expand Down Expand Up @@ -94,70 +93,58 @@ public enum SOCKET_ADDR_PREFERENCE {
}
}

private static int getFd(DatagramSocket s) {
int ret = -1;
private static int getFd(DatagramChannel channel) {
try {
Method m = s.getClass().getDeclaredMethod("getImpl");
m.setAccessible(true);
DatagramSocketImpl impl = (DatagramSocketImpl)m.invoke(s);
Field f = DatagramSocketImpl.class.getDeclaredField("fd");
f.setAccessible(true);
FileDescriptor fdi = (FileDescriptor)f.get(impl);
f = FileDescriptor.class.getDeclaredField("fd");
f.setAccessible(true);
ret = f.getInt(fdi);
Field unsafe = Unsafe.class.getDeclaredField("theUnsafe");
unsafe.setAccessible(true);
Unsafe theUnsafe = (Unsafe) unsafe.get(null);
Field fdVal = channel.getClass().getDeclaredField("fdVal");
return theUnsafe.getInt(channel, theUnsafe.objectFieldOffset(fdVal));
} catch (Exception e) {
if (logMINOR) { // TODO: Known Java 21 problem.
Logger.warning(UdpSocketHandler.class, e.getMessage(), e);
}
Logger.error(UdpSocketHandler.class, e.getMessage(), e);
return -1;
}
return ret;
}

public static boolean setAddressPreference(DatagramSocket s, SOCKET_ADDR_PREFERENCE p) {
if(!Platform.isLinux())
return false;
int fd = getFd(s);
if(fd <= 2)
return false;
int ret = -1;
public static boolean setAddressPreference(DatagramChannel channel, SOCKET_ADDR_PREFERENCE p) {
if (!Platform.isLinux()) {
return false;
}
int fd = getFd(channel);
if (fd <= 2) {
return false;
}
try {
ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE);
} catch(Exception e) { Logger.normal(UdpSocketHandler.class, e.getMessage(),e); } //if it fails that's fine
return (ret == 0 ? true : false);
int ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE);
return ret == 0;
} catch (Exception e) {
Logger.normal(UdpSocketHandler.class, e.getMessage(), e);
return false;
}
}
}

public UdpSocketHandler(int listenPort, InetAddress bindto, Node node, long startupTime, String title, IOStatisticCollector collector) throws SocketException {
public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector collector) throws IOException {
this.node = node;
this.collector = collector;
this.title = title;
_bindTo = bindto;
// Keep the Updater code in, just commented out, for now
// We may want to be able to do on-line updates.
// if (Updater.hasResource()) {
// _sock = (DatagramSocket) Updater.getResource();
// } else {
this.listenPort = listenPort;
_sock = new DatagramSocket(listenPort, bindto);
int sz = _sock.getReceiveBufferSize();
if(sz < 65536) {
_sock.setReceiveBufferSize(65536);
}
localAddress = new InetSocketAddress(bindToAddress, listenPort);
datagramChannel = DatagramChannel.open()
.bind(localAddress)
.setOption(StandardSocketOptions.SO_RCVBUF, 65536)
.setOption(StandardSocketOptions.SO_REUSEADDR, true);

try {
// Exit reasonably quickly
_sock.setReuseAddress(true);
} catch (SocketException e) {
throw new RuntimeException(e);
datagramChannel.setOption(StandardSocketOptions.IP_TOS, node.getTrafficClass().value);
} catch (UnsupportedOperationException e) {
Logger.error(this, "Failed to set IP_TOS socket option", e);
}
try {
_sock.setTrafficClass(node.getTrafficClass().value);
} catch (SocketException e) {
Logger.error(this, "Failed to setTrafficClass with "+node.getTrafficClass().value,e);

boolean r = socketOptions.setAddressPreference(datagramChannel, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC);
if(logMINOR) {
Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port " + listenPort + " is a " + (r ? "success" : "failure"));
}
boolean r = socketOptions.setAddressPreference(_sock, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC);
if(logMINOR) Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port "+ listenPort + " is a "+(r ? "success" : "failure"));
// }

// Only used for debugging, no need to seed from Yarrow
dropRandom = node.getFastWeakRandom();
tracker = AddressTracker.create(node.getLastBootId(), node.runDir(), listenPort);
Expand All @@ -171,7 +158,7 @@ public void setLowLevelFilter(IncomingPacketFilter f) {
}

public InetAddress getBindTo() {
return _bindTo;
return localAddress.getAddress();
}

public String getTitle() {
Expand Down Expand Up @@ -211,8 +198,8 @@ public void run() { // Listen for packets
t.printStackTrace();
} catch (Throwable tt) {}
} finally {
System.err.println("run() exiting for UdpSocketHandler on port "+_sock.getLocalPort());
Logger.error(this, "run() exiting for UdpSocketHandler on port "+_sock.getLocalPort());
System.err.println("run() exiting for UdpSocketHandler on port " + localAddress.getPort());
Logger.error(this, "run() exiting for UdpSocketHandler on port " + localAddress.getPort());
synchronized (this) {
_isDone = true;
notifyAll();
Expand All @@ -221,11 +208,9 @@ public void run() { // Listen for packets
}

private void runLoop() {
byte[] buf = new byte[MAX_RECEIVE_SIZE];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
while (_active) {
try {
realRun(packet);
realRun();
} catch (Throwable t) {
System.err.println("Caught "+t);
t.printStackTrace(System.err);
Expand All @@ -234,13 +219,12 @@ private void runLoop() {
}
}

private void realRun(DatagramPacket packet) {
// Single receiving thread
boolean gotPacket = getPacket(packet);
private void realRun() {
InetSocketAddress remote = receive();
long now = System.currentTimeMillis();
if (gotPacket) {
if (remote != null) {
long startTime = System.currentTimeMillis();
Peer peer = new Peer(packet.getAddress(), packet.getPort());
Peer peer = new Peer(remote.getAddress(), remote.getPort());
tracker.receivedPacketFrom(peer);
long endTime = System.currentTimeMillis();
if(endTime - startTime > 50) {
Expand All @@ -250,13 +234,13 @@ private void realRun(DatagramPacket packet) {
if(logMINOR) Logger.minor(this, "packet creation took "+(endTime-startTime)+"ms");
}
}
byte[] data = packet.getData();
int offset = packet.getOffset();
int length = packet.getLength();

try {
if(logMINOR) Logger.minor(this, "Processing packet of length "+length+" from "+peer);
if(logMINOR) {
Logger.minor(this, "Processing packet of length " + receiveBuffer.limit() + " from " + peer);
}
startTime = System.currentTimeMillis();
lowLevelFilter.process(data, offset, length, peer, now);
lowLevelFilter.process(receiveBuffer.array(), 0, receiveBuffer.limit(), peer, now);
endTime = System.currentTimeMillis();
if(endTime - startTime > 50) {
if(endTime-startTime > 3000) {
Expand All @@ -265,8 +249,9 @@ private void realRun(DatagramPacket packet) {
if(logMINOR) Logger.minor(this, "processing packet took "+(endTime-startTime)+"ms");
}
}
if(logMINOR) Logger.minor(this,
"Successfully handled packet length " + length);
if(logMINOR) {
Logger.minor(this, "Successfully handled packet length " + receiveBuffer.limit());
}
} catch (Throwable t) {
Logger.error(this, "Caught " + t + " from "
+ lowLevelFilter, t);
Expand All @@ -278,24 +263,24 @@ private void realRun(DatagramPacket packet) {

private static final int MAX_RECEIVE_SIZE = 1500;

private boolean getPacket(DatagramPacket packet) {
private InetSocketAddress receive() {
try {
_sock.receive(packet);
InetAddress address = packet.getAddress();
boolean isLocal = !IPUtil.isValidAddress(address, false);
collector.addInfo(address, packet.getPort(),
getHeadersLength(address) + packet.getLength(), 0, isLocal);
receiveBuffer.clear();
InetSocketAddress remote = (InetSocketAddress) datagramChannel.receive(receiveBuffer);
receiveBuffer.flip();
int port = remote.getPort();
InetAddress address = remote.getAddress();
collector.addInfo(address, port, getHeadersLength(address) + receiveBuffer.limit(), 0, isLocal(address));
return remote;
} catch (SocketTimeoutException e1) {
return false;
return null;
} catch (IOException e2) {
if (!_active) { // closed, just return silently
return false;
return null;
} else {
throw new RuntimeException(e2);
}
}
if(logMINOR) Logger.minor(this, "Received packet");
return true;
}

/**
Expand All @@ -306,45 +291,43 @@ private boolean getPacket(DatagramPacket packet) {
*/
@Override
public void sendPacket(byte[] blockToSend, Peer destination, boolean allowLocalAddresses) throws LocalAddressException {
assert(blockToSend != null);
if(!_active) {
Logger.error(this, "Trying to send packet but no longer active");
// It is essential that for recording accurate AddressTracker data that we don't send any more
// packets after shutdown.
return;
}

ByteBuffer packet = ByteBuffer.wrap(blockToSend);
int port = destination.getPort();
InetAddress address;
// there should be no DNS needed here, but go ahead if we can, but complain doing it
if( destination.getAddress(false, allowLocalAddresses) == null ) {
if ((address = destination.getAddress(false, allowLocalAddresses)) == null) {
Logger.error(this, "Tried sending to destination without pre-looked up IP address(needs a real Peer.getHostname()): null:" + destination.getPort(), new Exception("error"));
if( destination.getAddress(true, allowLocalAddresses) == null ) {
if ((address = destination.getAddress(true, allowLocalAddresses)) == null) {
Logger.error(this, "Tried sending to bad destination address: null:" + destination.getPort(), new Exception("error"));
return;
}
}
if (_dropProbability > 0) {
if (dropRandom.nextInt() % _dropProbability == 0) {
Logger.normal(this, "DROPPED: " + _sock.getLocalPort() + " -> " + destination.getPort());
Logger.normal(this, "DROPPED: " + localAddress.getPort() + " -> " + destination.getPort());
return;
}
}
InetAddress address = destination.getAddress(false, allowLocalAddresses);
assert(address != null);
int port = destination.getPort();
DatagramPacket packet = new DatagramPacket(blockToSend, blockToSend.length);
packet.setAddress(address);
packet.setPort(port);

try {
_sock.send(packet);
datagramChannel.send(packet, new InetSocketAddress(address, port));
tracker.sentPacketTo(destination);
boolean isLocal = (!IPUtil.isValidAddress(address, false)) && (IPUtil.isValidAddress(address, true));
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal);
if(logMINOR) Logger.minor(this, "Sent packet length "+blockToSend.length+" to "+address+':'+port);
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal(address));
if (logMINOR) {
Logger.minor(this, "Sent packet length " + blockToSend.length + " to " + address + ':' + port);
}
} catch (IOException | UnsupportedAddressTypeException e) {
if(packet.getAddress() instanceof Inet6Address) {
Logger.normal(this, "Error while sending packet to IPv6 address: "+destination+": "+e);
if (address instanceof Inet6Address) {
Logger.normal(this, "Error while sending packet to IPv6 address: " + destination + ": " + e);
} else {
Logger.error(this, "Error while sending packet to " + destination+": "+e, e);
Logger.error(this, "Error while sending packet to " + destination + ": " + e, e);
}
}
}
Expand Down Expand Up @@ -402,14 +385,18 @@ public void start() {
_started = true;
startTime = System.currentTimeMillis();
}
node.getExecutor().execute(this, "UdpSocketHandler for port "+listenPort);
node.getExecutor().execute(this, "UdpSocketHandler for port " + localAddress.getPort());
}

public void close() {
Logger.normal(this, "Closing.", new Exception("error"));
synchronized (this) {
_active = false;
_sock.close();
try {
datagramChannel.close();
} catch (IOException e) {
Logger.error(this, "Error closing DatagramChannel", e);
}

if(!_started) return;
while (!_isDone) {
Expand All @@ -420,7 +407,7 @@ public void close() {
}
}
}
tracker.storeData(node.getBootId(), node.runDir(), listenPort);
tracker.storeData(node.getBootId(), node.runDir(), localAddress.getPort());
}

public int getDropProbability() {
Expand All @@ -432,12 +419,12 @@ public void setDropProbability(int dropProbability) {
}

public int getPortNumber() {
return _sock.getLocalPort();
return localAddress.getPort();
}

@Override
public String toString() {
return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
return localAddress.toString();
}

@Override
Expand Down Expand Up @@ -477,4 +464,8 @@ public long getStartTime() {
return startTime;
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}

}

0 comments on commit 67e3ff3

Please sign in to comment.