Skip to content

Commit

Permalink
Simplify IO statistics collection for UdpSocketHandler
Browse files Browse the repository at this point in the history
This removes a bunch of complicated but unused code, and makes the
stats collector interface more self-explanatory.
  • Loading branch information
bertm committed Nov 30, 2024
1 parent 82f2ede commit 673fe73
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 222 deletions.
237 changes: 26 additions & 211 deletions src/freenet/io/comm/IOStatisticCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,221 +4,36 @@
package freenet.io.comm;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;

import freenet.support.Logger;
import freenet.support.Logger.LogLevel;
import freenet.support.transport.ip.IPUtil;

public class IOStatisticCollector {
public static final int STATISTICS_ENTRIES = 10;
public static final int STATISTICS_DURATION_S = 30;
public static final int STATISTICS_DURATION = 1000*STATISTICS_DURATION_S;
private long lastrotate;

private static boolean logDEBUG;
private long totalbytesin;
private long totalbytesout;
private final LinkedHashMap<String, StatisticEntry> targets;
static boolean ENABLE_PER_ADDRESS_TRACKING = false;

public IOStatisticCollector() {
targets = new LinkedHashMap<String, StatisticEntry>();
// TODO: only for testing!!!!
// This should only happen once
//SNMPAgent.create();
//SNMPStarter.initialize();
logDEBUG = Logger.shouldLog(LogLevel.DEBUG, this);
}

public void addInfo(InetAddress addr, int port, int inbytes, int outbytes, boolean isLocal) {
try {
synchronized (this) {
_addInfo(addr, port, inbytes, outbytes, isLocal);
}
} catch (Throwable t) {
t.printStackTrace();
}
}

private void _addInfo(InetAddress addr, int port, int inbytes, int outbytes, boolean isLocal) {
rotate();
if(ENABLE_PER_ADDRESS_TRACKING) {
String key = addr + ":" + port;
StatisticEntry entry = targets.get(key);
if (entry == null) {
entry = new StatisticEntry();
targets.put(key, entry);
}
entry.addData(Math.max(inbytes, 0), Math.max(outbytes, 0));
}
if(!isLocal) {
synchronized(this) {
totalbytesout += Math.max(outbytes, 0);
totalbytesin += Math.max(inbytes, 0);
if(logDEBUG)
Logger.debug(IOStatisticCollector.class, "Add("+addr+":"+port+ ',' +inbytes+ ',' +outbytes+" -> "+totalbytesin+" : "+totalbytesout);
}
}
}

public void dumpInfo() {
synchronized (this) {
_dumpInfo();
}
}
private long totalBytesIn;
private long totalBytesOut;

public long[] getTotalIO() {
synchronized (this) {
return _getTotalIO();
}
}

private long[] _getTotalIO() {
long ret[] = new long[2];
synchronized(this) {
ret[0] = totalbytesout;
ret[1] = totalbytesin;
}
return ret;
}

public int[][] getTotalStatistics() {
synchronized (this) {
return _getTotalStatistics();
}
}
public void reportReceivedBytes(InetAddress addr, int bytes) {
if (isLocal(addr) || bytes <= 0) {
return;
}
synchronized (this) {
totalBytesIn += bytes;
}
}

private int[][] _getTotalStatistics() {
//String[] keys = (String[])targets.keySet().toArray();
int ret[][] = new int[STATISTICS_ENTRIES][2];
for (int i = 0 ; i < STATISTICS_ENTRIES ; i++) {
ret[i][0] = ret[i][1] = 0;
}

for (Map.Entry<String,StatisticEntry> entry : targets.entrySet()) {
int inres[] = entry.getValue().getRecieved();
int outres[] = entry.getValue().getSent();
for (int i = 0 ; i < STATISTICS_ENTRIES ; i++) {
ret[i][1] += inres[i];
ret[i][0] += outres[i];
}
}

return ret;
}

private void _dumpInfo() {
rotate();
//DateFormat df = DateFormat.getDateInstance(DateFormat.LONG, Locale.FRANCE);
//System.err.println(DateFormat.getDateInstance().format(new Date()));
System.err.println(new Date());
final double divby = STATISTICS_DURATION_S*1024;
for (Map.Entry<String,StatisticEntry> entry : targets.entrySet()) {
String key = entry.getKey();
int inres[] = entry.getValue().getRecieved();
int outres[] = entry.getValue().getSent();
System.err.print((key + " ").substring(0,22) + ": ");
int tin = 0;
int tout = 0;

for (int i = 0 ; i < inres.length ; i++) {
// in/out in 102.4 bytes (hecto-bytes)
tin += inres[i];
tout += outres[i];

int in = (int) ((tin*10.0) / (divby*(i+1)));
int out =(int) ((tout*10.0) /(divby*(i+1)));

System.err.print("i:" + (in/10) + '.' + (in%10));
System.err.print(" o:" + (out/10) + '.' + (out%10));
System.err.print(" \t");
}
System.err.println();
}
System.err.println();
}

private void rotate() {
long now = System.currentTimeMillis();
if ((now - lastrotate) >= STATISTICS_DURATION) {
lastrotate = now;
Object[] keys = targets.keySet().toArray();
if(keys == null) return; // Why aren't we iterating there ?
for(int i = 0 ; i < keys.length ; i++) {
Object key = keys[i];
if (targets.get(key).rotate() == false)
targets.remove(key);
}
// FIXME: debugging
//_dumpInfo();
}
}
public void reportSentBytes(InetAddress addr, int bytes) {
if (isLocal(addr) || bytes <= 0) {
return;
}
synchronized (this) {
totalBytesOut += bytes;
}
}


/*
* to thead each update.... heavy stuff
private class StatisticUpdater implements Runnable {
private IOStatisticCollector sc;
private String key;
private int inbytes;
private int outbytes;
public StatisticUpdater(IOStatisticCollector sc, String key,
int inbytes, int outbytes) {
this.sc = sc;
this.key = key;
this.inbytes = inbytes;
this.outbytes = outbytes;
new Thread(this, "IOStatisticCollector$StatisticUpdater").run();
}
public void run() {
}
}
*/



private static class StatisticEntry {
private int recieved[];
private int sent[];

public StatisticEntry() {
// Create a new array and clear it
recieved = new int[IOStatisticCollector.STATISTICS_ENTRIES+1];
sent = new int[IOStatisticCollector.STATISTICS_ENTRIES+1];
for (int i = 0 ; i < recieved.length ; i++) {
recieved[i] = sent[i] = 0;
}
}

public void addData(int inbytes, int outbytes) {
recieved[0] += inbytes;
sent[0] += outbytes;
}

public boolean rotate() {
boolean hasdata = false;
for (int i = recieved.length - 1 ; i > 0 ; i--) {
recieved[i] = recieved[i-1];
sent[i] = sent[i-1];
hasdata |= (recieved[i] > 0) || (sent[i] > 0);
}
recieved[0] = sent[0] = 0;
return hasdata;
}

public int[] getRecieved() {
return Arrays.copyOfRange(recieved, 1, 1 + IOStatisticCollector.STATISTICS_ENTRIES);
}

public int[] getSent() {
return Arrays.copyOfRange(sent, 1, 1 + IOStatisticCollector.STATISTICS_ENTRIES);
}

}
public synchronized long[] getTotalIO() {
return new long[]{totalBytesOut, totalBytesIn};
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}
}
16 changes: 5 additions & 11 deletions src/freenet/io/comm/UdpSocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import freenet.node.PrioRunnable;
import freenet.support.Logger;
import freenet.support.io.NativeThread;
import freenet.support.transport.ip.IPUtil;
import sun.misc.Unsafe;

public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, PortForwardSensitiveSocketHandler {
Expand All @@ -48,7 +47,7 @@ public class UdpSocketHandler implements PrioRunnable, PacketSocketHandler, Port
private final String title;
private boolean _started;
private long startTime;
private final IOStatisticCollector collector;
private final IOStatisticCollector ioStatistics;

static {
Logger.registerClass(UdpSocketHandler.class);
Expand Down Expand Up @@ -124,9 +123,9 @@ public static boolean setAddressPreference(DatagramChannel channel, SOCKET_ADDR_
}
}

public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector collector) throws IOException {
public UdpSocketHandler(int listenPort, InetAddress bindToAddress, Node node, long startupTime, String title, IOStatisticCollector ioStatistics) throws IOException {
this.node = node;
this.collector = collector;
this.ioStatistics = ioStatistics;
this.title = title;
localAddress = new InetSocketAddress(bindToAddress, listenPort);
datagramChannel = DatagramChannel.open()
Expand Down Expand Up @@ -268,9 +267,8 @@ private InetSocketAddress receive() {
receiveBuffer.clear();
InetSocketAddress remote = (InetSocketAddress) datagramChannel.receive(receiveBuffer);
receiveBuffer.flip();
int port = remote.getPort();
InetAddress address = remote.getAddress();
collector.addInfo(address, port, getHeadersLength(address) + receiveBuffer.limit(), 0, isLocal(address));
ioStatistics.reportReceivedBytes(address, getHeadersLength(address) + receiveBuffer.limit());
return remote;
} catch (SocketTimeoutException e1) {
return null;
Expand Down Expand Up @@ -319,7 +317,7 @@ public void sendPacket(byte[] blockToSend, Peer destination, boolean allowLocalA
try {
datagramChannel.send(packet, new InetSocketAddress(address, port));
tracker.sentPacketTo(destination);
collector.addInfo(address, port, 0, getHeadersLength(address) + blockToSend.length, isLocal(address));
ioStatistics.reportSentBytes(address, getHeadersLength(address) + blockToSend.length);
if (logMINOR) {
Logger.minor(this, "Sent packet length " + blockToSend.length + " to " + address + ':' + port);
}
Expand Down Expand Up @@ -464,8 +462,4 @@ public long getStartTime() {
return startTime;
}

private static boolean isLocal(InetAddress address) {
return address.isLinkLocalAddress() || address.isLoopbackAddress() || IPUtil.isSiteLocalAddress(address);
}

}

0 comments on commit 673fe73

Please sign in to comment.