From f7fa0c490f2a7d8a20ec8c23cbb95229a59298fb Mon Sep 17 00:00:00 2001 From: Bert Massop Date: Sun, 17 Nov 2024 18:37:45 +0100 Subject: [PATCH] Remove code related to NLM load sending This code was effectively inactive since 2013 (which is when the FIXME in PeerNode.makeLoadStats was added: "re-enable when try NLM again"). Removing the related code not only gets rid of a few hundred lines of code, but also slightly reduces the (huge) number of things PeerNode needs to keep track of and simplifies the NPF logic substantially. Note: this does not remove receiving of load stats from peer nodes. --- src/freenet/io/comm/DMT.java | 71 +---- src/freenet/io/comm/Message.java | 24 +- src/freenet/node/BasePeerNode.java | 14 - src/freenet/node/MessageItem.java | 8 +- src/freenet/node/NewPacketFormat.java | 281 ++++++-------------- src/freenet/node/NodeDispatcher.java | 3 - src/freenet/node/NodeStats.java | 27 +- src/freenet/node/PeerMessageQueue.java | 64 +---- src/freenet/node/PeerNode.java | 158 +---------- test/freenet/node/MessageWrapperTest.java | 13 +- test/freenet/node/NewPacketFormatTest.java | 190 +------------ test/freenet/node/NullBasePeerNode.java | 18 +- test/freenet/node/PeerMessageQueueTest.java | 14 +- 13 files changed, 144 insertions(+), 741 deletions(-) diff --git a/src/freenet/io/comm/DMT.java b/src/freenet/io/comm/DMT.java index e35ceffaecf..12f0178b442 100644 --- a/src/freenet/io/comm/DMT.java +++ b/src/freenet/io/comm/DMT.java @@ -23,7 +23,6 @@ import freenet.keys.Key; import freenet.keys.NodeCHK; import freenet.keys.NodeSSK; -import freenet.node.NodeStats.PeerLoadStats; import freenet.node.probe.Error; import freenet.node.probe.Type; import freenet.support.BitArray; @@ -474,17 +473,19 @@ public static Message createFNPRejectedLoop(long id) { addField(UID, Long.class); addField(IS_LOCAL, Boolean.class); }}; - + + /** + * @deprecated last two NLM related arguments are ignored and can be omitted + */ + @Deprecated public static Message createFNPRejectedOverload(long id, boolean isLocal, boolean needsLoad, boolean realTimeFlag) { + return createFNPRejectedOverload(id, isLocal); + } + + public static Message createFNPRejectedOverload(long id, boolean isLocal) { Message msg = new Message(FNPRejectedOverload); msg.set(UID, id); msg.set(IS_LOCAL, isLocal); - if(needsLoad) { - if(realTimeFlag) - msg.setNeedsLoadRT(); - else - msg.setNeedsLoadBulk(); - } return msg; } @@ -1723,60 +1724,6 @@ public static Message createFNPRejectIsSoft() { addField(REAL_TIME_FLAG, Boolean.class); }}; - public static Message createFNPPeerLoadStatus(PeerLoadStats stats) { - Message msg; - if(stats.expectedTransfersInCHK < 256 && stats.expectedTransfersInSSK < 256 && - stats.expectedTransfersOutCHK < 256 && stats.expectedTransfersOutSSK < 256 && - stats.averageTransfersOutPerInsert < 256 && stats.maxTransfersOut < 256 && - stats.maxTransfersOutLowerLimit < 256 && stats.maxTransfersOutPeerLimit < 256 && - stats.maxTransfersOutUpperLimit < 256) { - msg = new Message(FNPPeerLoadStatusByte); - msg.set(OTHER_TRANSFERS_OUT_CHK, (byte)stats.expectedTransfersOutCHK); - msg.set(OTHER_TRANSFERS_IN_CHK, (byte)stats.expectedTransfersInCHK); - msg.set(OTHER_TRANSFERS_OUT_SSK, (byte)stats.expectedTransfersOutSSK); - msg.set(OTHER_TRANSFERS_IN_SSK, (byte)stats.expectedTransfersInSSK); - msg.set(AVERAGE_TRANSFERS_OUT_PER_INSERT, (byte)stats.averageTransfersOutPerInsert); - msg.set(MAX_TRANSFERS_OUT, (byte)stats.maxTransfersOut); - msg.set(MAX_TRANSFERS_OUT_PEER_LIMIT, (byte)stats.maxTransfersOutPeerLimit); - msg.set(MAX_TRANSFERS_OUT_LOWER_LIMIT, (byte)stats.maxTransfersOutLowerLimit); - msg.set(MAX_TRANSFERS_OUT_UPPER_LIMIT, (byte)stats.maxTransfersOutUpperLimit); - } else if(stats.expectedTransfersInCHK < 65536 && stats.expectedTransfersInSSK < 65536 && - stats.expectedTransfersOutCHK < 65536 && stats.expectedTransfersOutSSK < 65536 && - stats.averageTransfersOutPerInsert < 65536 && stats.maxTransfersOut < 65536 && - stats.maxTransfersOutLowerLimit < 65536 && stats.maxTransfersOutPeerLimit < 65536 && - stats.maxTransfersOutUpperLimit < 65536) { - msg = new Message(FNPPeerLoadStatusShort); - msg.set(OTHER_TRANSFERS_OUT_CHK, (short)stats.expectedTransfersOutCHK); - msg.set(OTHER_TRANSFERS_IN_CHK, (short)stats.expectedTransfersInCHK); - msg.set(OTHER_TRANSFERS_OUT_SSK, (short)stats.expectedTransfersOutSSK); - msg.set(OTHER_TRANSFERS_IN_SSK, (short)stats.expectedTransfersInSSK); - msg.set(AVERAGE_TRANSFERS_OUT_PER_INSERT, (short)stats.averageTransfersOutPerInsert); - msg.set(MAX_TRANSFERS_OUT, (short)stats.maxTransfersOut); - msg.set(MAX_TRANSFERS_OUT_PEER_LIMIT, (short)stats.maxTransfersOutPeerLimit); - msg.set(MAX_TRANSFERS_OUT_LOWER_LIMIT, (short)stats.maxTransfersOutLowerLimit); - msg.set(MAX_TRANSFERS_OUT_UPPER_LIMIT, (short)stats.maxTransfersOutUpperLimit); - } else { - msg = new Message(FNPPeerLoadStatusInt); - msg.set(OTHER_TRANSFERS_OUT_CHK, stats.expectedTransfersOutCHK); - msg.set(OTHER_TRANSFERS_IN_CHK, stats.expectedTransfersInCHK); - msg.set(OTHER_TRANSFERS_OUT_SSK, stats.expectedTransfersOutSSK); - msg.set(OTHER_TRANSFERS_IN_SSK, stats.expectedTransfersInSSK); - msg.set(AVERAGE_TRANSFERS_OUT_PER_INSERT, stats.averageTransfersOutPerInsert); - msg.set(MAX_TRANSFERS_OUT, stats.maxTransfersOut); - msg.set(MAX_TRANSFERS_OUT_PEER_LIMIT, stats.maxTransfersOutPeerLimit); - msg.set(MAX_TRANSFERS_OUT_LOWER_LIMIT, stats.maxTransfersOutLowerLimit); - msg.set(MAX_TRANSFERS_OUT_UPPER_LIMIT, stats.maxTransfersOutUpperLimit); - } - msg.set(OUTPUT_BANDWIDTH_LOWER_LIMIT, (int)stats.outputBandwidthLowerLimit); - msg.set(OUTPUT_BANDWIDTH_UPPER_LIMIT, (int)stats.outputBandwidthUpperLimit); - msg.set(OUTPUT_BANDWIDTH_PEER_LIMIT, (int)stats.outputBandwidthPeerLimit); - msg.set(INPUT_BANDWIDTH_LOWER_LIMIT, (int)stats.inputBandwidthLowerLimit); - msg.set(INPUT_BANDWIDTH_UPPER_LIMIT, (int)stats.inputBandwidthUpperLimit); - msg.set(INPUT_BANDWIDTH_PEER_LIMIT, (int)stats.inputBandwidthPeerLimit); - msg.set(REAL_TIME_FLAG, stats.realTime); - return msg; - } - public static final String AVERAGE_TRANSFERS_OUT_PER_INSERT = "averageTransfersOutPerInsert"; public static final String OTHER_TRANSFERS_OUT_CHK = "otherTransfersOutCHK"; diff --git a/src/freenet/io/comm/Message.java b/src/freenet/io/comm/Message.java index 5aadf7491b5..6be944e7a21 100644 --- a/src/freenet/io/comm/Message.java +++ b/src/freenet/io/comm/Message.java @@ -33,9 +33,9 @@ import freenet.support.Fields; import freenet.support.LogThresholdCallback; import freenet.support.Logger; +import freenet.support.Logger.LogLevel; import freenet.support.Serializer; import freenet.support.ShortBuffer; -import freenet.support.Logger.LogLevel; /** * A Message which can be read from and written to a DatagramPacket. @@ -75,9 +75,7 @@ public void shouldUpdate(){ public final long localInstantiationTime; final int _receivedByteCount; short priority; - private boolean needsLoadRT; - private boolean needsLoadBulk; - + public static Message decodeMessageFromPacket(byte[] buf, int offset, int length, PeerContext peer, int overhead) { ByteBufferInputStream bb = new ByteBufferInputStream(buf, offset, length); return decodeMessage(bb, peer, length + overhead, true, false, false); @@ -179,8 +177,6 @@ private Message(Message m) { localInstantiationTime = System.currentTimeMillis(); _receivedByteCount = 0; priority = m.priority; - needsLoadRT = m.needsLoadRT; - needsLoadBulk = m.needsLoadBulk; } public boolean getBoolean(String key) { @@ -406,22 +402,6 @@ public void boostPriority() { priority--; } - public boolean needsLoadRT() { - return needsLoadRT; - } - - public boolean needsLoadBulk() { - return needsLoadBulk; - } - - public void setNeedsLoadRT() { - needsLoadRT = true; - } - - public void setNeedsLoadBulk() { - needsLoadBulk = true; - } - /** Clone the message, clear sub-messages and set originator to self. */ public Message cloneAndDropSubMessages() { return new Message(this); diff --git a/src/freenet/node/BasePeerNode.java b/src/freenet/node/BasePeerNode.java index cdb0544684c..a037e54aa13 100644 --- a/src/freenet/node/BasePeerNode.java +++ b/src/freenet/node/BasePeerNode.java @@ -60,20 +60,6 @@ public interface BasePeerNode extends PeerContext { void handleMessage(Message msg); - /** Make a load stats message. - * @param realtime True for the realtime load stats, false for the bulk load stats. - * @param highPriority If true, boost the priority so it gets sent fast. - * @param noRemember If true, generating it for a lossy message in a packet; don't - * remember that we sent it, since it might be lost, and generate it even if the last - * one was the same, since the last one might be delayed. */ - MessageItem makeLoadStats(boolean realtime, boolean highPriority, boolean noRemember); - - boolean grabSendLoadStatsASAP(boolean realtime); - - /** Set the load stats to be sent asap. E.g. if we grabbed it and can't actually - * execute the send for some reason. */ - void setSendLoadStatsASAP(boolean realtime); - /** Average ping time incorporating variance, calculated like TCP SRTT, as with RFC 2988. */ double averagePingTimeCorrected(); diff --git a/src/freenet/node/MessageItem.java b/src/freenet/node/MessageItem.java index e5ad4cf0cea..1f55e17535c 100644 --- a/src/freenet/node/MessageItem.java +++ b/src/freenet/node/MessageItem.java @@ -26,8 +26,6 @@ public class MessageItem { private final short priority; private long cachedID; private boolean hasCachedID; - final boolean sendLoadRT; - final boolean sendLoadBulk; private long deadline; public MessageItem(Message msg2, AsyncMessageCallback[] cb2, ByteCounter ctr, short overridePriority) { @@ -40,8 +38,6 @@ public MessageItem(Message msg2, AsyncMessageCallback[] cb2, ByteCounter ctr, sh priority = overridePriority; else priority = msg2.getPriority(); - this.sendLoadRT = msg2.needsLoadRT(); - this.sendLoadBulk = msg2.needsLoadBulk(); buf = msg.encodeToPacket(); if(buf.length > NewPacketFormat.MAX_MESSAGE_SIZE) { // This is bad because fairness between UID's happens at the level of message queueing, @@ -56,7 +52,7 @@ public MessageItem(Message msg2, AsyncMessageCallback[] cb2, ByteCounter ctr) { this(msg2, cb2, ctr, (short)-1); } - public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean formatted, ByteCounter ctr, short priority, boolean sendLoadRT, boolean sendLoadBulk) { + public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean formatted, ByteCounter ctr, short priority) { this.cb = cb2; this.msg = null; this.buf = data; @@ -66,8 +62,6 @@ public MessageItem(byte[] data, AsyncMessageCallback[] cb2, boolean formatted, B this.ctrCallback = ctr; this.submitted = System.currentTimeMillis(); this.priority = priority; - this.sendLoadRT = sendLoadRT; - this.sendLoadBulk = sendLoadBulk; } /** diff --git a/src/freenet/node/NewPacketFormat.java b/src/freenet/node/NewPacketFormat.java index 9f40f51428e..ae5ab95e202 100644 --- a/src/freenet/node/NewPacketFormat.java +++ b/src/freenet/node/NewPacketFormat.java @@ -3,6 +3,8 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.node; +import static java.util.concurrent.TimeUnit.MINUTES; + import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; @@ -27,8 +29,6 @@ import freenet.support.Logger.LogLevel; import freenet.support.SparseBitmap; -import static java.util.concurrent.TimeUnit.MINUTES; - public class NewPacketFormat implements PacketFormat { private static final int HMAC_LENGTH = 10; @@ -590,71 +590,28 @@ NPFPacket createPacket(int maxPacketSize, PeerMessageQueue messageQueue, Session if(logDEBUG) Logger.debug(this, "Added acks for "+this+" for "+pn.shortToString()); } - byte[] haveAddedStatsBulk = null; - byte[] haveAddedStatsRT = null; - if(!ackOnly) { boolean addedFragments = false; - while(true) { - - boolean addStatsBulk = false; - boolean addStatsRT = false; - - synchronized(sendBufferLock) { - // Always finish what we have started before considering sending more packets. - // Anything beyond this is beyond the scope of NPF and is PeerMessageQueue's job. -addOldLoop: for(Map started : startedByPrio) { - //Try to finish messages that have been started - Iterator it = started.values().iterator(); - while(it.hasNext() && packet.getLength() < maxPacketSize) { - MessageWrapper wrapper = it.next(); - while(packet.getLength() < maxPacketSize) { - MessageFragment frag = wrapper.getMessageFragment(maxPacketSize - packet.getLength()); - if(frag == null) break; - mustSend = true; - addedFragments = true; - packet.addMessageFragment(frag); - sentPacket.addFragment(frag); - if(wrapper.allSent()) { - if((haveAddedStatsBulk == null) && wrapper.getItem().sendLoadBulk) { - addStatsBulk = true; - // Add the lossy message outside the lock. - break addOldLoop; - } - if((haveAddedStatsRT == null) && wrapper.getItem().sendLoadRT) { - addStatsRT = true; - // Add the lossy message outside the lock. - break addOldLoop; - } - } - } + synchronized(sendBufferLock) { + // Always finish what we have started before considering sending more packets. + // Anything beyond this is beyond the scope of NPF and is PeerMessageQueue's job. + for (Map started : startedByPrio) { + //Try to finish messages that have been started + Iterator it = started.values().iterator(); + while (it.hasNext() && packet.getLength() < maxPacketSize) { + MessageWrapper wrapper = it.next(); + while (packet.getLength() < maxPacketSize) { + MessageFragment frag = wrapper.getMessageFragment(maxPacketSize - packet.getLength()); + if (frag == null) break; + mustSend = true; + addedFragments = true; + packet.addMessageFragment(frag); + sentPacket.addFragment(frag); } } } - - if(!(addStatsBulk || addStatsRT)) break; - - if(addStatsBulk) { - MessageItem item = pn.makeLoadStats(false, false, true); - if(item != null) { - byte[] buf = item.getData(); - haveAddedStatsBulk = buf; - // FIXME if this fails, drop some messages. - packet.addLossyMessage(buf, maxPacketSize); - } - } - - if(addStatsRT) { - MessageItem item = pn.makeLoadStats(true, false, true); - if(item != null) { - byte[] buf = item.getData(); - haveAddedStatsRT = buf; - // FIXME if this fails, drop some messages. - packet.addLossyMessage(buf, maxPacketSize); - } - } } if(addedFragments) { @@ -720,164 +677,78 @@ NPFPacket createPacket(int maxPacketSize, PeerMessageQueue messageQueue, Session return null; } - boolean sendStatsBulk = false, sendStatsRT = false; - - if(!ackOnly) { - - sendStatsBulk = pn.grabSendLoadStatsASAP(false); - sendStatsRT = pn.grabSendLoadStatsASAP(true); - - if(sendStatsBulk || sendStatsRT) { - if(!checkedCanSend) - cantSend = !canSend(sessionKey); - checkedCanSend = true; - if(cantSend) { - if(sendStatsBulk) - pn.setSendLoadStatsASAP(false); - if(sendStatsRT) - pn.setSendLoadStatsASAP(true); - } else { - mustSend = true; - } - } - } - if(ackOnly && numAcks == 0) return null; if((!ackOnly) && (!cantSend)) { - - if(sendStatsBulk) { - MessageItem item = pn.makeLoadStats(false, true, false); - if(item != null) { - if(haveAddedStatsBulk != null) { - packet.removeLossyMessage(haveAddedStatsBulk); - } - messageQueue.pushfrontPrioritizedMessageItem(item); - haveAddedStatsBulk = item.buf; - } - } - - if(sendStatsRT) { - MessageItem item = pn.makeLoadStats(true, true, false); - if(item != null) { - if(haveAddedStatsRT != null) { - packet.removeLossyMessage(haveAddedStatsRT); - } - messageQueue.pushfrontPrioritizedMessageItem(item); - haveAddedStatsRT = item.buf; - } - } - + fragments: for(int i = 0; i < startedByPrio.size(); i++) { + //Add messages from the message queue + while ((packet.getLength() + 10) < maxPacketSize) { //Fragment header is max 9 bytes, allow min 1 byte data - prio: - while(true) { - - boolean addStatsBulk = false; - boolean addStatsRT = false; - - //Add messages from the message queue - while ((packet.getLength() + 10) < maxPacketSize) { //Fragment header is max 9 bytes, allow min 1 byte data - - if(!checkedCanSend) { - // Check in advance to avoid reordering message items. - cantSend = !canSend(sessionKey); - } - checkedCanSend = false; - if(cantSend) break; - boolean wasGeneratedPing = false; - - MessageItem item = messageQueue.grabQueuedMessageItem(i); - if(item == null) { - if(mustSendKeepalive && packet.noFragments()) { - // Create a ping for keepalive purposes. - // It will be acked, this ensures both sides don't timeout. - Message msg; - synchronized(this) { - msg = DMT.createFNPPing(pingCounter++); - } - item = new MessageItem(msg, null, null); - item.setDeadline(now + PacketSender.MAX_COALESCING_DELAY); - wasGeneratedPing = true; - // Should we report this on the PeerNode's stats? We'd need to run a job off-thread, so probably not worth it. - } else { - break prio; - } - } - - int messageID = getMessageID(); - if(messageID == -1) { - // CONCURRENCY: This will fail sometimes if we send messages to the same peer from different threads. - // This doesn't happen at the moment because we use a single PacketSender for all ports and all peers. - // We might in future split it across multiple threads but it'd be best to keep the same peer on the same thread. - Logger.error(this, "No availiable message ID, requeuing and sending packet (we already checked didn't we???)"); - if(!wasGeneratedPing) { - messageQueue.pushfrontPrioritizedMessageItem(item); - // No point adding to queue if it's just a ping: - // We will try again next time. - // But odds are the connection is broken and the other side isn't responding... + if (!checkedCanSend) { + // Check in advance to avoid reordering message items. + cantSend = !canSend(sessionKey); + } + checkedCanSend = false; + if (cantSend) break; + boolean wasGeneratedPing = false; + + MessageItem item = messageQueue.grabQueuedMessageItem(i); + if (item == null) { + if (mustSendKeepalive && packet.noFragments()) { + // Create a ping for keepalive purposes. + // It will be acked, this ensures both sides don't timeout. + Message msg; + synchronized (this) { + msg = DMT.createFNPPing(pingCounter++); } - break fragments; + item = new MessageItem(msg, null, null); + item.setDeadline(now + PacketSender.MAX_COALESCING_DELAY); + wasGeneratedPing = true; + // Should we report this on the PeerNode's stats? We'd need to run a job off-thread, so probably not worth it. + } else { + break; } - - if(logDEBUG) Logger.debug(this, "Allocated "+messageID+" for "+item+" for "+this); - - MessageWrapper wrapper = new MessageWrapper(item, messageID); - MessageFragment frag = wrapper.getMessageFragment(maxPacketSize - packet.getLength()); - if(frag == null) { + } + + int messageID = getMessageID(); + if (messageID == -1) { + // CONCURRENCY: This will fail sometimes if we send messages to the same peer from different threads. + // This doesn't happen at the moment because we use a single PacketSender for all ports and all peers. + // We might in future split it across multiple threads but it'd be best to keep the same peer on the same thread. + Logger.error(this, "No availiable message ID, requeuing and sending packet (we already checked didn't we???)"); + if (!wasGeneratedPing) { messageQueue.pushfrontPrioritizedMessageItem(item); - break prio; - } - packet.addMessageFragment(frag); - sentPacket.addFragment(frag); - - //Priority of the one we grabbed might be higher than i - Map queue = startedByPrio.get(item.getPriority()); - synchronized(sendBufferLock) { - // CONCURRENCY: This could go over the limit if we allow createPacket() for the same node on two threads in parallel. That's probably a bad idea anyway. - sendBufferUsed += item.buf.length; - if(logDEBUG) Logger.debug(this, "Added " + item.buf.length + " to remote buffer. Total is now " + sendBufferUsed + " for "+pn.shortToString()); - queue.put(messageID, wrapper); + // No point adding to queue if it's just a ping: + // We will try again next time. + // But odds are the connection is broken and the other side isn't responding... } - - if(wrapper.allSent()) { - if((haveAddedStatsBulk == null) && wrapper.getItem().sendLoadBulk) { - addStatsBulk = true; - break; - } - if((haveAddedStatsRT == null) && wrapper.getItem().sendLoadRT) { - addStatsRT = true; - break; - } - } - + break fragments; } - - if(!(addStatsBulk || addStatsRT)) break; - - if(addStatsBulk) { - MessageItem item = pn.makeLoadStats(false, false, true); - if(item != null) { - byte[] buf = item.getData(); - haveAddedStatsBulk = item.buf; - // FIXME if this fails, drop some messages. - packet.addLossyMessage(buf, maxPacketSize); - } + + if (logDEBUG) + Logger.debug(this, "Allocated " + messageID + " for " + item + " for " + this); + + MessageWrapper wrapper = new MessageWrapper(item, messageID); + MessageFragment frag = wrapper.getMessageFragment(maxPacketSize - packet.getLength()); + if (frag == null) { + messageQueue.pushfrontPrioritizedMessageItem(item); + break; } - - if(addStatsRT) { - MessageItem item = pn.makeLoadStats(true, false, true); - if(item != null) { - byte[] buf = item.getData(); - haveAddedStatsRT = item.buf; - // FIXME if this fails, drop some messages. - packet.addLossyMessage(buf, maxPacketSize); - } + packet.addMessageFragment(frag); + sentPacket.addFragment(frag); + + //Priority of the one we grabbed might be higher than i + Map queue = startedByPrio.get(item.getPriority()); + synchronized (sendBufferLock) { + // CONCURRENCY: This could go over the limit if we allow createPacket() for the same node on two threads in parallel. That's probably a bad idea anyway. + sendBufferUsed += item.buf.length; + if (logDEBUG) + Logger.debug(this, "Added " + item.buf.length + " to remote buffer. Total is now " + sendBufferUsed + " for " + pn.shortToString()); + queue.put(messageID, wrapper); } - - if(cantSend) break; - } + } } } diff --git a/src/freenet/node/NodeDispatcher.java b/src/freenet/node/NodeDispatcher.java index e0393619d9c..3230b788f7b 100644 --- a/src/freenet/node/NodeDispatcher.java +++ b/src/freenet/node/NodeDispatcher.java @@ -289,9 +289,6 @@ public boolean handleMessage(Message m) { private void rejectRequest(Message m, ByteCounter ctr) { long uid = m.getLong(DMT.UID); Message msg = DMT.createFNPRejectedOverload(uid, true, false, false); - // Send the load status anyway, hopefully this is a temporary problem. - msg.setNeedsLoadBulk(); - msg.setNeedsLoadRT(); try { m.getSource().sendAsync(msg, null, ctr); } catch (NotConnectedException e) { diff --git a/src/freenet/node/NodeStats.java b/src/freenet/node/NodeStats.java index 986edcc4400..3fb4374144f 100644 --- a/src/freenet/node/NodeStats.java +++ b/src/freenet/node/NodeStats.java @@ -1,5 +1,10 @@ package freenet.node; +import static java.util.concurrent.TimeUnit.DAYS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.text.DecimalFormat; import java.text.NumberFormat; import java.util.Arrays; @@ -40,11 +45,6 @@ import freenet.support.math.TimeDecayingRunningAverage; import freenet.support.math.TrivialRunningAverage; -import static java.util.concurrent.TimeUnit.DAYS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.MINUTES; -import static java.util.concurrent.TimeUnit.SECONDS; - /** Node (as opposed to NodeClientCore) level statistics. Includes shouldRejectRequest(), but not limited * to stuff required to implement that. */ public class NodeStats implements Persistable, BlockTimeCallback { @@ -1430,15 +1430,6 @@ private String checkBandwidthLiability(double bandwidthAvailableOutputUpperLimit double thisAllocation = getPeerLimit(source, bandwidthAvailableOutputUpperLimit - bandwidthAvailableOutputLowerLimit, input, transfersPerInsert, realTimeFlag, peers, peerRequestsSnapshot.calculateSR(ignoreLocalVsRemoteBandwidthLiability, input)); - if(SEND_LOAD_STATS_NOTICES && source != null) { - // FIXME tell local as well somehow? - if(!input) { - source.onSetMaxOutputTransfers(realTimeFlag, maxOutputTransfers); - source.onSetMaxOutputTransfersPeerLimit(realTimeFlag, maxOutputTransfersPeerLimit); - } - source.onSetPeerAllocation(input, (int)thisAllocation, transfersPerInsert, maxOutputTransfers, realTimeFlag); - } - // Ignore the upper limit. // Because we reassignToSelf() in various tricky timeout conditions, it is possible to exceed it. // Even if we do we still need to allow the guaranteed allocation for each peer. @@ -1516,10 +1507,6 @@ private String checkMaxOutputTransfers(int maxOutputTransfers, return "TooManyTransfers: Fair sharing between peers"; } - - - static final boolean SEND_LOAD_STATS_NOTICES = true; - /** * @param source The peer. * @param totalGuaranteedBandwidth The difference between the upper and lower overall @@ -3556,10 +3543,6 @@ public TimedStats[] getDatabaseJobExecutionStatistics() { return entries; } - public PeerLoadStats createPeerLoadStats(PeerNode peer, int transfersPerInsert, boolean realTimeFlag) { - return new PeerLoadStats(peer, transfersPerInsert, realTimeFlag); - } - public PeerLoadStats parseLoadStats(PeerNode source, Message m) { return new PeerLoadStats(source, m); } diff --git a/src/freenet/node/PeerMessageQueue.java b/src/freenet/node/PeerMessageQueue.java index 75da9af4f94..fd69dfe7a14 100644 --- a/src/freenet/node/PeerMessageQueue.java +++ b/src/freenet/node/PeerMessageQueue.java @@ -13,7 +13,6 @@ import freenet.support.LogThresholdCallback; import freenet.support.Logger; import freenet.support.Logger.LogLevel; -import freenet.support.MutableBoolean; /** * Queue of messages to send to a node. Ordered first by priority then by time. @@ -37,10 +36,7 @@ public void shouldUpdate(){ private final PrioQueue[] queuesByPriority; - private boolean mustSendLoadRT; - private boolean mustSendLoadBulk; - - private class PrioQueue { + private static class PrioQueue { // FIXME refactor into PrioQueue and RoundRobinByUIDPrioQueue PrioQueue(long timeout, boolean timeoutSinceLastSend) { @@ -54,7 +50,7 @@ private class PrioQueue { * to the last send. Block transfers need this - both realtime and bulk. */ final boolean roundRobinBetweenUIDs; - private class Items extends DoublyLinkedListImpl.Item { + private static class Items extends DoublyLinkedListImpl.Item { /** List of messages to send. Stuff to send first is at the beginning. */ final LinkedList items; final long id; @@ -442,7 +438,7 @@ public int addSize(int length, int maxSize) { return length; } - private MessageItem addNonUrgentMessages(long now, MutableBoolean addPeerLoadStatsRT, MutableBoolean addPeerLoadStatsBulk) { + private MessageItem addNonUrgentMessages(long now) { if(logMINOR) checkOrder(); if(itemsNonUrgent == null) return null; MessageItem ret; @@ -491,13 +487,6 @@ private MessageItem addNonUrgentMessages(long now, MutableBoolean addPeerLoadSta } } } - if(mustSendLoadRT && item.sendLoadRT && !addPeerLoadStatsRT.value) { - addPeerLoadStatsRT.value = true; - mustSendLoadRT = false; - } else if(mustSendLoadBulk && item.sendLoadBulk && !addPeerLoadStatsBulk.value) { - addPeerLoadStatsBulk.value = true; - mustSendLoadBulk = false; - } if(logMINOR) checkOrder(); if(ret != null) return ret; @@ -516,7 +505,7 @@ private MessageItem addNonUrgentMessages(long now, MutableBoolean addPeerLoadSta * @return the size of messages, multiplied by -1 if there were * messages that didn't fit */ - private MessageItem addUrgentMessages(long now, MutableBoolean addPeerLoadStatsRT, MutableBoolean addPeerLoadStatsBulk) { + private MessageItem addUrgentMessages(long now) { if(logMINOR) checkOrder(); MessageItem ret; while(true) { @@ -560,13 +549,6 @@ private MessageItem addUrgentMessages(long now, MutableBoolean addPeerLoadStatsR else list = prev.getNext(); ret = item; - if(mustSendLoadRT && item.sendLoadRT && !addPeerLoadStatsRT.value) { - addPeerLoadStatsRT.value = true; - mustSendLoadRT = false; - } else if(mustSendLoadBulk && item.sendLoadBulk && !addPeerLoadStatsBulk.value) { - addPeerLoadStatsBulk.value = true; - mustSendLoadBulk = false; - } if(logMINOR) checkOrder(); if(ret != null) return ret; } @@ -583,15 +565,11 @@ private MessageItem addUrgentMessages(long now, MutableBoolean addPeerLoadStatsR * @param size * @param now * @param messages - * @param addPeerLoadStatsRT Will be set if the caller needs to include a load stats message for - * realtime (i.e. a realtime request completes etc). - * @param addPeerLoadStatsBulk Will be set if the caller needs to include a load stats message for - * bulk (i.e. a bulk request completes etc). * @param incomplete Will be set if there were more messages but they did not fit. If this is * not set, we can try another priority. * @return */ - MessageItem addPriorityMessages(long now, MutableBoolean addPeerLoadStatsRT, MutableBoolean addPeerLoadStatsBulk) { + MessageItem addPriorityMessages(long now) { // Urgent messages first. if(logMINOR) { int nonEmpty = nonEmptyItemsWithID == null ? 0 : nonEmptyItemsWithID.size(); @@ -606,13 +584,13 @@ MessageItem addPriorityMessages(long now, MutableBoolean addPeerLoadStatsRT, Mut moveToUrgent(now); clearOldNonUrgent(now); if(roundRobinBetweenUIDs) { - MessageItem item = addUrgentMessages(now, addPeerLoadStatsRT, addPeerLoadStatsBulk); + MessageItem item = addUrgentMessages(now); if(item != null) return item; } else { assert(itemsByID == null); } // If no more urgent messages, try to add some non-urgent messages too. - return addNonUrgentMessages(now, addPeerLoadStatsRT, addPeerLoadStatsBulk); + return addNonUrgentMessages(now); } private void clearOldNonUrgent(long now) { @@ -774,10 +752,6 @@ private synchronized void enqueuePrioritizedMessageItem(MessageItem addMe) { //Assume it goes on the end, both the common case short prio = addMe.getPriority(); queuesByPriority[prio].addLast(addMe); - if(addMe.sendLoadRT) - mustSendLoadRT = true; - if(addMe.sendLoadBulk) - mustSendLoadBulk = true; } /** @@ -790,10 +764,6 @@ synchronized void pushfrontPrioritizedMessageItem(MessageItem addMe) { //Assume it goes on the front short prio = addMe.getPriority(); queuesByPriority[prio].addFirst(addMe); - if(addMe.sendLoadRT) - mustSendLoadRT = true; - if(addMe.sendLoadBulk) - mustSendLoadBulk = true; } public synchronized MessageItem[] grabQueuedMessageItems() { @@ -862,17 +832,11 @@ public synchronized boolean mustSendSize(int minSize, int maxSize) { * check in advance if possible. */ public synchronized MessageItem grabQueuedMessageItem(int minPriority) { long now = System.currentTimeMillis(); - - MutableBoolean addPeerLoadStatsRT = new MutableBoolean(); - MutableBoolean addPeerLoadStatsBulk = new MutableBoolean(); - - addPeerLoadStatsRT.value = true; - addPeerLoadStatsBulk.value = true; - + for(int i=0;i 5000) { - if(logMINOR) Logger.minor(this, "Last sent allocation "+TimeUtil.formatTime(now - timeLastSentAllocationNotice)); - mustSend = true; - } else { - if(thisAllocation > last * 1.05) { - if(logMINOR) Logger.minor(this, "Last allocation was "+last+" this is "+thisAllocation); - mustSend = true; - } else if(thisAllocation < last * 0.9) { - if(logMINOR) Logger.minor(this, "Last allocation was "+last+" this is "+thisAllocation); - mustSend = true; - } - } - if(!mustSend) return; - sendASAP = true; - } - if(!mustSend) return; - } - - public void onSetMaxOutputTransfers(int maxOutputTransfers) { - synchronized(this) { - if(maxOutputTransfers == lastSentMaxOutputTransfers) return; - if(lastSentMaxOutputTransfers == Integer.MAX_VALUE || lastSentMaxOutputTransfers == 0) { - sendASAP = true; - } else if(maxOutputTransfers > lastSentMaxOutputTransfers * 1.05 || maxOutputTransfers < lastSentMaxOutputTransfers * 0.9) { - sendASAP = true; - } - } - } - - public void onSetMaxOutputTransfersPeerLimit(int maxOutputTransfersPeerLimit) { - synchronized(this) { - if(maxOutputTransfersPeerLimit == lastSentMaxOutputTransfersPeerLimit) return; - if(lastSentMaxOutputTransfersPeerLimit == Integer.MAX_VALUE || lastSentMaxOutputTransfersPeerLimit == 0) { - sendASAP = true; - } else if(maxOutputTransfersPeerLimit > lastSentMaxOutputTransfersPeerLimit * 1.05 || maxOutputTransfersPeerLimit < lastSentMaxOutputTransfersPeerLimit * 0.9) { - sendASAP = true; - } - } - } - - Message makeLoadStats(long now, int transfersPerInsert, boolean noRemember) { - PeerLoadStats stats = node.getNodeStats().createPeerLoadStats(PeerNode.this, transfersPerInsert, realTimeFlag); - synchronized(this) { - lastSentAllocationInput = (int) stats.inputBandwidthPeerLimit; - lastSentAllocationOutput = (int) stats.outputBandwidthPeerLimit; - lastSentMaxOutputTransfers = stats.maxTransfersOut; - if(!noRemember) { - if(lastFullStats != null && lastFullStats.equals(stats)) return null; - lastFullStats = stats; - } - timeLastSentAllocationNotice = now; - countAllocationNotices++; - if(logMINOR) Logger.minor(this, "Sending allocation notice to "+this+" allocation is "+lastSentAllocationInput+" input "+lastSentAllocationOutput+" output."); - } - Message msg = DMT.createFNPPeerLoadStatus(stats); - return msg; - } - - public synchronized boolean grabSendASAP() { - boolean send = sendASAP; - sendASAP = false; - return send; - } - - public synchronized void setSendASAP() { - sendASAP = true; - } - - } - void removeUIDsFromMessageQueues(Long[] list) { this.messageQueue.removeUIDsFromMessageQueues(list); } - public void onSetMaxOutputTransfers(boolean realTime, int maxOutputTransfers) { - (realTime ? loadSenderRealTime : loadSenderBulk).onSetMaxOutputTransfers(maxOutputTransfers); - } - - public void onSetMaxOutputTransfersPeerLimit(boolean realTime, int maxOutputTransfers) { - (realTime ? loadSenderRealTime : loadSenderBulk).onSetMaxOutputTransfersPeerLimit(maxOutputTransfers); - } - - public void onSetPeerAllocation(boolean input, int thisAllocation, int transfersPerInsert, int maxOutputTransfers, boolean realTime) { - (realTime ? loadSenderRealTime : loadSenderBulk).onSetPeerAllocation(input, thisAllocation, transfersPerInsert); - } - - public class IncomingLoadSummaryStats { + public static class IncomingLoadSummaryStats { public IncomingLoadSummaryStats(int totalRequests, double outputBandwidthPeerLimit, double inputBandwidthPeerLimit, @@ -4992,7 +4873,7 @@ public IncomingLoadSummaryStats getIncomingLoadStats() { RunningRequestsSnapshot runningRequests = node.getNodeStats().getRunningRequestsTo(PeerNode.this, loadStats.averageTransfersOutPerInsert, realTime); RunningRequestsSnapshot otherRunningRequests = loadStats.getOtherRunningRequests(); boolean ignoreLocalVsRemoteBandwidthLiability = node.getNodeStats().ignoreLocalVsRemoteBandwidthLiability(); - return new IncomingLoadSummaryStats(runningRequests.totalRequests(), + return new IncomingLoadSummaryStats(runningRequests.totalRequests(), loadStats.outputBandwidthPeerLimit, loadStats.inputBandwidthPeerLimit, loadStats.outputBandwidthUpperLimit, loadStats.inputBandwidthUpperLimit, runningRequests.calculate(ignoreLocalVsRemoteBandwidthLiability, false), @@ -5292,10 +5173,6 @@ public IncomingLoadSummaryStats getIncomingLoadStats(boolean realTime) { return outputLoadTracker(realTime).getIncomingLoadStats(); } - public LoadSender loadSender(boolean realtime) { - return realtime ? loadSenderRealTime : loadSenderBulk; - } - /** A fatal timeout occurred, and we don't know whether the peer is still running the * request we passed in for us. If it is, we cannot reuse that slot. So we need to * query it periodically until it is no longer running it. If we cannot send the query @@ -5457,25 +5334,6 @@ public synchronized boolean matchesIP(FreenetInetAddress addr, boolean strict) { return false; } - @Override - public MessageItem makeLoadStats(boolean realtime, boolean boostPriority, boolean noRemember) { - // FIXME re-enable when try NLM again. - return null; -// Message msg = loadSender(realtime).makeLoadStats(System.currentTimeMillis(), node.nodeStats.outwardTransfersPerInsert(), noRemember); -// if(msg == null) return null; -// return new MessageItem(msg, null, node.nodeStats.allocationNoticesCounter, boostPriority ? DMT.PRIORITY_NOW : (short)-1); - } - - @Override - public boolean grabSendLoadStatsASAP(boolean realtime) { - return loadSender(realtime).grabSendASAP(); - } - - @Override - public void setSendLoadStatsASAP(boolean realtime) { - loadSender(realtime).setSendASAP(); - } - @Override public DecodingMessageGroup startProcessingDecryptedMessages(int size) { return new MyDecodingMessageGroup(size); diff --git a/test/freenet/node/MessageWrapperTest.java b/test/freenet/node/MessageWrapperTest.java index 65a3401bcda..499902ea112 100644 --- a/test/freenet/node/MessageWrapperTest.java +++ b/test/freenet/node/MessageWrapperTest.java @@ -1,13 +1,18 @@ package freenet.node; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.Test; public class MessageWrapperTest { @Test public void testGetFragment() { - MessageItem item = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false); + MessageItem item = new MessageItem(new byte[1024], null, false, null, (short) 0); MessageWrapper wrapper = new MessageWrapper(item, 0); MessageFragment frag = wrapper.getMessageFragment(128); @@ -53,7 +58,7 @@ public void testGetFragment() { @Test public void testGetFragmentWithLoss() { - MessageItem item = new MessageItem(new byte[363], null, false, null, (short) 0, false, false); + MessageItem item = new MessageItem(new byte[363], null, false, null, (short) 0); MessageWrapper wrapper = new MessageWrapper(item, 0); MessageFragment frag1 = wrapper.getMessageFragment(128); @@ -107,7 +112,7 @@ public void testGetFragmentWithLoss() { @Test public void testLost() { - MessageItem item = new MessageItem(new byte[363], null, false, null, (short) 0, false, false); + MessageItem item = new MessageItem(new byte[363], null, false, null, (short) 0); MessageWrapper wrapper = new MessageWrapper(item, 0); MessageFragment frag = wrapper.getMessageFragment(128); diff --git a/test/freenet/node/NewPacketFormatTest.java b/test/freenet/node/NewPacketFormatTest.java index a2ca6d5983b..4841cc9174c 100644 --- a/test/freenet/node/NewPacketFormatTest.java +++ b/test/freenet/node/NewPacketFormatTest.java @@ -3,25 +3,23 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.node; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.Random; -import org.junit.Before; -import org.junit.Test; - import freenet.crypt.BlockCipher; import freenet.crypt.DummyRandomSource; import freenet.crypt.ciphers.Rijndael; -import freenet.io.comm.DMT; import freenet.io.comm.FreenetInetAddress; -import freenet.io.comm.Message; import freenet.io.comm.Peer; -import freenet.support.MutableBoolean; +import org.junit.Before; +import org.junit.Test; public class NewPacketFormatTest { @Before @@ -74,7 +72,7 @@ public void testLostLastAck() throws BlockedTooLongException, InterruptedExcepti senderNode.currentKey = senderKey; SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - senderQueue.queueAndEstimateSize(new MessageItem(new byte[1024], null, false, null, (short) 0, false, false), 1024); + senderQueue.queueAndEstimateSize(new MessageItem(new byte[1024], null, false, null, (short) 0), 1024); NPFPacket fragment1 = sender.createPacket(512, senderQueue, senderKey, false); assertEquals(1, fragment1.getFragments().size()); @@ -123,7 +121,7 @@ public void testOutOfOrderDelivery() throws BlockedTooLongException { SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - senderQueue.queueAndEstimateSize(new MessageItem(new byte[1024], null, false, null, (short) 0, false, false), 1024); + senderQueue.queueAndEstimateSize(new MessageItem(new byte[1024], null, false, null, (short) 0), 1024); NPFPacket fragment1 = sender.createPacket(512, senderQueue, senderKey, false); assertEquals(1, fragment1.getFragments().size()); @@ -149,7 +147,7 @@ public void testReceiveUnknownMessageLength() throws BlockedTooLongException { SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - senderQueue.queueAndEstimateSize(new MessageItem(new byte[1024], null, false, null, (short) 0, false, false), 1024); + senderQueue.queueAndEstimateSize(new MessageItem(new byte[1024], null, false, null, (short) 0), 1024); NPFPacket fragment1 = sender.createPacket(512, senderQueue, senderKey, false); assertEquals(1, fragment1.getFragments().size()); @@ -173,7 +171,7 @@ public void testResendAlreadyCompleted() throws BlockedTooLongException, Interru SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0, false, false), 1024); + senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0), 1024); Thread.sleep(PacketSender.MAX_COALESCING_DELAY*2); NPFPacket packet1 = sender.createPacket(512, senderQueue, senderKey, false); @@ -185,172 +183,7 @@ public void testResendAlreadyCompleted() throws BlockedTooLongException, Interru //Same message, new sequence number ie. resend assertEquals(0, receiver.handleDecryptedPacket(packet1, receiverKey).size()); } - - // Test sending it when the peer wants it to be sent. This is as a real message, *not* as a lossy message. - @Test - public void testLoadStatsSendWhenPeerWants() throws BlockedTooLongException, InterruptedException { - final Message loadMessage = DMT.createFNPVoid(); - final MutableBoolean gotMessage = new MutableBoolean(); - final SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - NullBasePeerNode senderNode = new NullBasePeerNode() { - - boolean shouldSend = true; - - @Override - public MessageItem makeLoadStats(boolean realtime, boolean highPriority, boolean noRemember) { - return new MessageItem(loadMessage, null, null, (short)0); - } - - @Override - public synchronized boolean grabSendLoadStatsASAP(boolean realtime) { - boolean ret = shouldSend; - shouldSend = false; - return ret; - } - - @Override - public synchronized void setSendLoadStatsASAP(boolean realtime) { - shouldSend = true; - } - - @Override - public SessionKey getCurrentKeyTracker() { - return senderKey; - } - - }; - NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0); - PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234)); - NullBasePeerNode receiverNode = new NullBasePeerNode() { - - @Override - public void handleMessage(Message msg) { - assert(msg.getSpec().equals(DMT.FNPVoid)); - synchronized(gotMessage) { - gotMessage.value = true; - } - } - - @Override - public void processDecryptedMessage(byte[] data, int offset, int length, int overhead) { - Message m = Message.decodeMessageFromPacket(data, offset, length, this, overhead); - if(m != null) { - handleMessage(m); - } - } - - }; - NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0); - SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - - senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0, false, false), 1024); - Thread.sleep(PacketSender.MAX_COALESCING_DELAY*2); - NPFPacket packet1 = sender.createPacket(512, senderQueue, senderKey, false); - assert(packet1.getLossyMessages().isEmpty()); - assert(packet1.getFragments().size() == 2); - synchronized(gotMessage) { - assert(!gotMessage.value); - } - List finished = receiver.handleDecryptedPacket(packet1, receiverKey); - assertEquals(2, finished.size()); - DecodingMessageGroup decoder = receiverNode.startProcessingDecryptedMessages(finished.size()); - for(byte[] buffer : finished) { - decoder.processDecryptedMessage(buffer, 0, buffer.length, 0); - } - decoder.complete(); - - synchronized(gotMessage) { - assert(gotMessage.value); - } - } - - // Test sending it as a per-packet lossy message. - @Test - public void testLoadStatsLowLevel() throws BlockedTooLongException, InterruptedException { - final byte[] loadMessage = - new byte[] { (byte)0xFF, (byte)0xEE, (byte)0xDD, (byte)0xCC, (byte)0xBB, (byte)0xAA}; - final SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - NullBasePeerNode senderNode = new NullBasePeerNode() { - - @Override - public MessageItem makeLoadStats(boolean realtime, boolean highPriority, boolean noRemember) { - return new MessageItem(loadMessage, null, false, null, (short) 0, false, false); - } - - @Override - public SessionKey getCurrentKeyTracker() { - return senderKey; - } - - }; - NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0); - PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234)); - - senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0, false, true), 1024); - - Thread.sleep(PacketSender.MAX_COALESCING_DELAY*2); - NPFPacket packet1 = sender.createPacket(512, senderQueue, senderKey, false); - assertTrue(packet1 != null); - assertEquals(1, packet1.getFragments().size()); - assertEquals(1, packet1.getLossyMessages().size()); - NPFPacketTest.checkEquals(loadMessage, packet1.getLossyMessages().get(0)); - // Don't decode the packet because it's not a real message. - } - - // Test sending load message as a per-packet lossy message, including message decoding. - @Test - public void testLoadStatsHighLevel() throws BlockedTooLongException, InterruptedException { - final Message loadMessage = DMT.createFNPVoid(); - final MutableBoolean gotMessage = new MutableBoolean(); - NullBasePeerNode senderNode = new NullBasePeerNode() { - - @Override - public MessageItem makeLoadStats(boolean realtime, boolean highPriority, boolean noRemember) { - return new MessageItem(loadMessage, null, null, (short)0); - } - - @Override - public void handleMessage(Message msg) { - assert(msg.getSpec().equals(DMT.FNPVoid)); - synchronized(gotMessage) { - gotMessage.value = true; - } - } - - }; - NewPacketFormat sender = new NewPacketFormat(senderNode, 0, 0); - PeerMessageQueue senderQueue = new PeerMessageQueue(new DummyRandomSource(1234)); - NullBasePeerNode receiverNode = new NullBasePeerNode() { - - @Override - public void handleMessage(Message msg) { - assert(msg.getSpec().equals(DMT.FNPVoid)); - synchronized(gotMessage) { - gotMessage.value = true; - } - } - - }; - NewPacketFormat receiver = new NewPacketFormat(receiverNode, 0, 0); - SessionKey senderKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - SessionKey receiverKey = new SessionKey(null, null, null, null, null, null, null, null, new NewPacketFormatKeyContext(0, 0), 1); - - senderQueue.queueAndEstimateSize(new MessageItem(new byte[128], null, false, null, (short) 0, false, true), 1024); - - Thread.sleep(PacketSender.MAX_COALESCING_DELAY*2); - NPFPacket packet1 = sender.createPacket(512, senderQueue, senderKey, false); - assertEquals(1, packet1.getFragments().size()); - assertEquals(1, packet1.getLossyMessages().size()); - synchronized(gotMessage) { - assert(!gotMessage.value); - } - assertEquals(1, receiver.handleDecryptedPacket(packet1, receiverKey).size()); - synchronized(gotMessage) { - assert(gotMessage.value); - } - } - /* This checks the output of the sequence number encryption function to * make sure it doesn't change accidentally. */ @Test @@ -442,8 +275,7 @@ public void testEncryption() byte[] copyOfMessage = Arrays.copyOf(message, message.length); senderQueue.queueAndEstimateSize( - new MessageItem(message, null, false, null, (short) 0, false, - false), 1024); + new MessageItem(message, null, false, null, (short) 0), 1024); senderNode.messageQueue = senderQueue; Thread.sleep(PacketSender.MAX_COALESCING_DELAY * 2); diff --git a/test/freenet/node/NullBasePeerNode.java b/test/freenet/node/NullBasePeerNode.java index c41050a8746..ff51457a968 100644 --- a/test/freenet/node/NullBasePeerNode.java +++ b/test/freenet/node/NullBasePeerNode.java @@ -9,9 +9,9 @@ import freenet.io.comm.Message; import freenet.io.comm.NotConnectedException; import freenet.io.comm.Peer; +import freenet.io.comm.Peer.LocalAddressException; import freenet.io.comm.PeerContext; import freenet.io.comm.SocketHandler; -import freenet.io.comm.Peer.LocalAddressException; import freenet.io.xfer.PacketThrottle; /** Tests can override this to record specific events e.g. rekey */ @@ -223,22 +223,6 @@ public void handleMessage(Message msg) { throw new UnsupportedOperationException(); } - @Override - public MessageItem makeLoadStats(boolean realtime, boolean highPriority, boolean lossy) { - // Don't send load stats. - return null; - } - - @Override - public boolean grabSendLoadStatsASAP(boolean realtime) { - return false; - } - - @Override - public void setSendLoadStatsASAP(boolean realtime) { - throw new UnsupportedOperationException(); - } - @Override public void reportThrottledPacketSendTime(long time, boolean realTime) { // Ignore. diff --git a/test/freenet/node/PeerMessageQueueTest.java b/test/freenet/node/PeerMessageQueueTest.java index eccbc2701f1..ecebba0687d 100644 --- a/test/freenet/node/PeerMessageQueueTest.java +++ b/test/freenet/node/PeerMessageQueueTest.java @@ -3,7 +3,9 @@ * http://www.gnu.org/ for further details of the GPL. */ package freenet.node; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; import org.junit.Test; @@ -22,7 +24,7 @@ public void testUrgentTime() { //Constructor might take some time, so grab a range long start = System.currentTimeMillis(); - MessageItem item = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false); + MessageItem item = new MessageItem(new byte[1024], null, false, null, (short) 0); long end = System.currentTimeMillis(); pmq.queueAndEstimateSize(item, 1024); @@ -43,7 +45,7 @@ public void testUrgentTimeQueuedWrong() { //Constructor might take some time, so grab a range long start = System.currentTimeMillis(); - MessageItem itemUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false); + MessageItem itemUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0); long end = System.currentTimeMillis(); //Sleep for a little while to get a later timeout @@ -53,7 +55,7 @@ public void testUrgentTimeQueuedWrong() { } - MessageItem itemNonUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false); + MessageItem itemNonUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0); //Queue the least urgent item first to get the wrong order pmq.queueAndEstimateSize(itemNonUrgent, 1024); @@ -71,7 +73,7 @@ public void testUrgentTimeQueuedWrong() { public void testGrabQueuedMessageItem() { PeerMessageQueue pmq = new PeerMessageQueue(new DummyRandomSource(1234)); - MessageItem itemUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false); + MessageItem itemUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0); //Sleep for a little while to get a later timeout try { @@ -80,7 +82,7 @@ public void testGrabQueuedMessageItem() { } - MessageItem itemNonUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0, false, false); + MessageItem itemNonUrgent = new MessageItem(new byte[1024], null, false, null, (short) 0); //Queue the least urgent item first to get the wrong order pmq.queueAndEstimateSize(itemNonUrgent, 1024);