Skip to content

Commit

Permalink
Support IPV6_ADDR_PREFERENCES until Java 24 and beyond
Browse files Browse the repository at this point in the history
While reflection on Java's internals is slowly being crippled, Unsafe
access is still possible (at least until JEP 417 [0] reaches phase 3 in
Java 26 or later).

This requires moving from DatagramSocket to DatagramChannel as the
former is substantially rewritten in Java 15 (JEP 373, see [1]) as a
wrapper around the latter. The channel-based socket cannot be used in
Java versions prior to 14 due to Java bug JDK-8232673 [2] which prevents
read/write concurrency, effectively stalling the node.

Tested on OpenJDK 8, 11, 17, 21, 23, and 24.
Fixes bug 07217.

[0] https://openjdk.org/jeps/471
[1] https://openjdk.org/jeps/373
[2] https://bugs.openjdk.org/browse/JDK-8232673
  • Loading branch information
bertm committed Nov 30, 2024
1 parent ed64823 commit 7ce3c40
Showing 1 changed file with 99 additions and 108 deletions.
207 changes: 99 additions & 108 deletions src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package freenet.io.comm;

import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.DatagramSocketImpl;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.Random;

Expand All @@ -26,11 +24,13 @@
import freenet.support.Logger;
import freenet.support.io.NativeThread;
import freenet.support.transport.ip.IPUtil;
import sun.misc.Unsafe;

public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, PortForwardSensitiveSocketHandler {

private final DatagramSocket _sock;
private final InetAddress _bindTo;
private final ByteBuffer receiveBuffer = ByteBuffer.allocate(MAX_RECEIVE_SIZE);
private final DatagramChannel datagramChannel;
private final InetSocketAddress localAddress;
private final AddressTracker tracker;
private IncomingPacketFilter lowLevelFilter;
/** RNG for debugging, used with _dropProbability.
Expand All @@ -45,7 +45,6 @@ public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, Port
private static volatile boolean logDEBUG;
private boolean _isDone;
private volatile boolean _active = true;
private final int listenPort;
private final String title;
private boolean _started;
private long startTime;
Expand Down Expand Up @@ -94,70 +93,58 @@ public enum SOCKET_ADDR_PREFERENCE {
}
}

private static int getFd(DatagramSocket s) {
int ret = -1;
private static int getFd(DatagramChannel channel) {
try {
Method m = s.getClass().getDeclaredMethod("getImpl");
m.setAccessible(true);
DatagramSocketImpl impl = (DatagramSocketImpl)m.invoke(s);
Field f = DatagramSocketImpl.class.getDeclaredField("fd");
f.setAccessible(true);
FileDescriptor fdi = (FileDescriptor)f.get(impl);
f = FileDescriptor.class.getDeclaredField("fd");
f.setAccessible(true);
ret = f.getInt(fdi);
Field unsafe = Unsafe.class.getDeclaredField("theUnsafe");
unsafe.setAccessible(true);
Unsafe theUnsafe = (Unsafe) unsafe.get(null);
Field fdVal = channel.getClass().getDeclaredField("fdVal");
return theUnsafe.getInt(channel, theUnsafe.objectFieldOffset(fdVal));
} catch (Exception e) {
if (logMINOR) { // TODO: Known Java 21 problem.
Logger.warning(UdpSocketHandler.class, e.getMessage(), e);
}
Logger.warning(UdpSocketHandler.class, e.getMessage(), e);
return -1;
}
return ret;
}

public static boolean setAddressPreference(DatagramSocket s, SOCKET_ADDR_PREFERENCE p) {
if(!Platform.isLinux())
return false;
int fd = getFd(s);
if(fd <= 2)
return false;
int ret = -1;
public static boolean setAddressPreference(DatagramChannel channel, SOCKET_ADDR_PREFERENCE p) {
if (!Platform.isLinux()) {
return false;
}
int fd = getFd(channel);
if (fd <= 2) {
return false;
}
try {
ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE);
} catch(Exception e) { Logger.normal(UdpSocketHandler.class, e.getMessage(),e); } //if it fails that's fine
return (ret == 0 ? true : false);
int ret = socketOptionsHolder.setsockopt(fd, SOCKET_level.IPPROTO_IPV6.linux, p.option_name.linux, new IntByReference(p.linux).getPointer(), Native.POINTER_SIZE);
return ret == 0;
} catch (Exception e) {
Logger.normal(UdpSocketHandler.class, e.getMessage(), e);
return false;
}
}
}

public UdpSocketHandler(int listenPort, InetAddress bindto, Node node, long startupTime, String title, IOStatisticCollector collector) throws SocketException {
public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector collector) throws IOException {
this.node = node;
this.collector = collector;
this.title = title;
_bindTo = bindto;
// Keep the Updater code in, just commented out, for now
// We may want to be able to do on-line updates.
// if (Updater.hasResource()) {
// _sock = (DatagramSocket) Updater.getResource();
// } else {
this.listenPort = listenPort;
_sock = new DatagramSocket(listenPort, bindto);
int sz = _sock.getReceiveBufferSize();
if(sz < 65536) {
_sock.setReceiveBufferSize(65536);
}
localAddress = new InetSocketAddress(bindToAddress, listenPort);
datagramChannel = DatagramChannel.open()
.bind(localAddress)
.setOption(StandardSocketOptions.SO_RCVBUF, 65536)
.setOption(StandardSocketOptions.SO_REUSEADDR, true);

try {
// Exit reasonably quickly
_sock.setReuseAddress(true);
} catch (SocketException e) {
throw new RuntimeException(e);
datagramChannel.setOption(StandardSocketOptions.IP_TOS, node.getTrafficClass().value);
} catch (UnsupportedOperationException e) {
Logger.error(this, "Failed to set IP_TOS socket option", e);
}
try {
_sock.setTrafficClass(node.getTrafficClass().value);
} catch (SocketException e) {
Logger.error(this, "Failed to setTrafficClass with "+node.getTrafficClass().value,e);

boolean r = socketOptions.setAddressPreference(datagramChannel, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC);
if(logMINOR) {
Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port " + listenPort + " is a " + (r ? "success" : "failure"));
}
boolean r = socketOptions.setAddressPreference(_sock, socketOptions.SOCKET_ADDR_PREFERENCE.IPV6_PREFER_SRC_PUBLIC);
if(logMINOR) Logger.minor(this, "Setting IPV6_PREFER_SRC_PUBLIC for port "+ listenPort + " is a "+(r ? "success" : "failure"));
// }

// Only used for debugging, no need to seed from Yarrow
dropRandom = node.getFastWeakRandom();
tracker = AddressTracker.create(node.getLastBootId(), node.runDir(), listenPort);
Expand All @@ -171,7 +158,7 @@ public void setLowLevelFilter(IncomingPacketFilter f) {
}

public InetAddress getBindTo() {
return _bindTo;
return localAddress.getAddress();
}

public String getTitle() {
Expand Down Expand Up @@ -211,8 +198,8 @@ public void run() { // Listen for packets
t.printStackTrace();
} catch (Throwable tt) {}
} finally {
System.err.println("run() exiting for UdpSocketHandler on port "+_sock.getLocalPort());
Logger.error(this, "run() exiting for UdpSocketHandler on port "+_sock.getLocalPort());
System.err.println("run() exiting for UdpSocketHandler on port " + localAddress.getPort());
Logger.error(this, "run() exiting for UdpSocketHandler on port " + localAddress.getPort());
synchronized (this) {
_isDone = true;
notifyAll();
Expand All @@ -221,11 +208,9 @@ public void run() { // Listen for packets
}

private void runLoop() {
byte[] buf = new byte[MAX_RECEIVE_SIZE];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
while (_active) {
try {
realRun(packet);
realRun();
} catch (Throwable t) {
System.err.println("Caught "+t);
t.printStackTrace(System.err);
Expand All @@ -234,13 +219,12 @@ private void runLoop() {
}
}

private void realRun(DatagramPacket packet) {
// Single receiving thread
boolean gotPacket = getPacket(packet);
private void realRun() {
InetSocketAddress remote = receive();
long now = System.currentTimeMillis();
if (gotPacket) {
if (remote != null) {
long startTime = System.currentTimeMillis();
Peer peer = new Peer(packet.getAddress(), packet.getPort());
Peer peer = new Peer(remote.getAddress(), remote.getPort());
tracker.receivedPacketFrom(peer);
long endTime = System.currentTimeMillis();
if(endTime - startTime > 50) {
Expand All @@ -250,13 +234,13 @@ private void realRun(DatagramPacket packet) {
if(logMINOR) Logger.minor(this, "packet creation took "+(endTime-startTime)+"ms");
}
}
byte[] data = packet.getData();
int offset = packet.getOffset();
int length = packet.getLength();

try {
if(logMINOR) Logger.minor(this, "Processing packet of length "+length+" from "+peer);
if(logMINOR) {
Logger.minor(this, "Processing packet of length " + receiveBuffer.limit() + " from " + peer);
}
startTime = System.currentTimeMillis();
lowLevelFilter.process(data, offset, length, peer, now);
lowLevelFilter.process(receiveBuffer.array(), 0, receiveBuffer.limit(), peer, now);
endTime = System.currentTimeMillis();
if(endTime - startTime > 50) {
if(endTime-startTime > 3000) {
Expand All @@ -265,8 +249,9 @@ private void realRun(DatagramPacket packet) {
if(logMINOR) Logger.minor(this, "processing packet took "+(endTime-startTime)+"ms");
}
}
if(logMINOR) Logger.minor(this,
"Successfully handled packet length " + length);
if(logMINOR) {
Logger.minor(this, "Successfully handled packet length " + receiveBuffer.limit());
}
} catch (Throwable t) {
Logger.error(this, "Caught " + t + " from "
+ lowLevelFilter, t);
Expand All @@ -278,24 +263,24 @@ private void realRun(DatagramPacket packet) {

private static final int MAX_RECEIVE_SIZE = 1500;

private boolean getPacket(DatagramPacket packet) {
private InetSocketAddress receive() {
try {
_sock.receive(packet);
InetAddress address = packet.getAddress();
boolean isLocal = !IPUtil.isValidAddress(address, false);
collector.addInfo(address, packet.getPort(),
getHeadersLength(address) + packet.getLength(), 0, isLocal);
receiveBuffer.clear();
InetSocketAddress remote = (InetSocketAddress) datagramChannel.receive(receiveBuffer);
receiveBuffer.flip();
int port = remote.getPort();
InetAddress address = remote.getAddress();
collector.addInfo(address, port, getHeadersLength(address) + receiveBuffer.limit(), 0, isLocal(address));
return remote;
} catch (SocketTimeoutException e1) {
return false;
return null;
} catch (IOException e2) {
if (!_active) { // closed, just return silently
return false;
return null;
} else {
throw new RuntimeException(e2);
}
}
if(logMINOR) Logger.minor(this, "Received packet");
return true;
}

/**
Expand All @@ -306,45 +291,43 @@ private boolean getPacket(DatagramPacket packet) {
*/
@Override
public void sendPacket(byte[] blockToSend, Peer destination, boolean allowLocalAddresses) throws LocalAddressException {
assert(blockToSend != null);
if(!_active) {
Logger.error(this, "Trying to send packet but no longer active");
// It is essential that for recording accurate AddressTracker data that we don't send any more
// packets after shutdown.
return;
}

ByteBuffer packet = ByteBuffer.wrap(blockToSend);
int port = destination.getPort();
InetAddress address;
// there should be no DNS needed here, but go ahead if we can, but complain doing it
if( destination.getAddress(false, allowLocalAddresses) == null ) {
if ((address = destination.getAddress(false, allowLocalAddresses)) == null) {
Logger.error(this, "Tried sending to destination without pre-looked up IP address(needs a real Peer.getHostname()): null:" + destination.getPort(), new Exception("error"));
if( destination.getAddress(true, allowLocalAddresses) == null ) {
if ((address = destination.getAddress(true, allowLocalAddresses)) == null) {
Logger.error(this, "Tried sending to bad destination address: null:" + destination.getPort(), new Exception("error"));
return;
}
}
if (_dropProbability > 0) {
if (dropRandom.nextInt() % _dropProbability == 0) {
Logger.normal(this, "DROPPED: " + _sock.getLocalPort() + " -> " + destination.getPort());
Logger.normal(this, "DROPPED: " + localAddress.getPort() + " -> " + destination.getPort());
return;
}
}
InetAddress address = destination.getAddress(false, allowLocalAddresses);
assert(address != null);
int port = destination.getPort();
DatagramPacket packet = new DatagramPacket(blockToSend, blockToSend.length);
packet.setAddress(address);
packet.setPort(port);

try {
_sock.send(packet);
datagramChannel.send(packet, new InetSocketAddress(address, port));
tracker.sentPacketTo(destination);
boolean isLocal = (!IPUtil.isValidAddress(address, false)) && (IPUtil.isValidAddress(address, true));
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal);
if(logMINOR) Logger.minor(this, "Sent packet length "+blockToSend.length+" to "+address+':'+port);
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal(address));
if (logMINOR) {
Logger.minor(this, "Sent packet length " + blockToSend.length + " to " + address + ':' + port);
}
} catch (IOException | UnsupportedAddressTypeException e) {
if(packet.getAddress() instanceof Inet6Address) {
Logger.normal(this, "Error while sending packet to IPv6 address: "+destination+": "+e);
if (address instanceof Inet6Address) {
Logger.normal(this, "Error while sending packet to IPv6 address: " + destination + ": " + e);
} else {
Logger.error(this, "Error while sending packet to " + destination+": "+e, e);
Logger.error(this, "Error while sending packet to " + destination + ": " + e, e);
}
}
}
Expand Down Expand Up @@ -402,14 +385,18 @@ public void start() {
_started = true;
startTime = System.currentTimeMillis();
}
node.getExecutor().execute(this, "UdpSocketHandler for port "+listenPort);
node.getExecutor().execute(this, "UdpSocketHandler for port " + localAddress.getPort());
}

public void close() {
Logger.normal(this, "Closing.", new Exception("error"));
synchronized (this) {
_active = false;
_sock.close();
try {
datagramChannel.close();
} catch (IOException e) {
Logger.error(this, "Error closing DatagramChannel", e);
}

if(!_started) return;
while (!_isDone) {
Expand All @@ -420,7 +407,7 @@ public void close() {
}
}
}
tracker.storeData(node.getBootId(), node.runDir(), listenPort);
tracker.storeData(node.getBootId(), node.runDir(), localAddress.getPort());
}

public int getDropProbability() {
Expand All @@ -432,12 +419,12 @@ public void setDropProbability(int dropProbability) {
}

public int getPortNumber() {
return _sock.getLocalPort();
return localAddress.getPort();
}

@Override
public String toString() {
return _sock.getLocalAddress() + ":" + _sock.getLocalPort();
return localAddress.toString();
}

@Override
Expand Down Expand Up @@ -477,4 +464,8 @@ public long getStartTime() {
return startTime;
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}

}

0 comments on commit 7ce3c40

Please sign in to comment.