diff --git a/WaarpCommon/src/main/java/org/waarp/common/utility/WaarpNettyUtil.java b/WaarpCommon/src/main/java/org/waarp/common/utility/WaarpNettyUtil.java index 12fb3f833c..5b84ce1bc4 100644 --- a/WaarpCommon/src/main/java/org/waarp/common/utility/WaarpNettyUtil.java +++ b/WaarpCommon/src/main/java/org/waarp/common/utility/WaarpNettyUtil.java @@ -24,6 +24,7 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.Future; @@ -35,7 +36,11 @@ public final class WaarpNettyUtil { private static final int TIMEOUT_MILLIS = 1000; + // Default optimal value for Waarp private static final int BUFFER_SIZE_1MB = 1048576; + // Default optimal value from Netty (tested as correct for Waarp) + private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024; + private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024; private WaarpNettyUtil() { } @@ -57,6 +62,9 @@ public static void setBootstrap(Bootstrap bootstrap, EventLoopGroup group, bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); bootstrap.option(ChannelOption.SO_RCVBUF, BUFFER_SIZE_1MB); bootstrap.option(ChannelOption.SO_SNDBUF, BUFFER_SIZE_1MB); + bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, + DEFAULT_HIGH_WATER_MARK)); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } @@ -79,6 +87,9 @@ public static void setServerBootstrap(ServerBootstrap bootstrap, bootstrap.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); bootstrap.childOption(ChannelOption.SO_RCVBUF, BUFFER_SIZE_1MB); bootstrap.childOption(ChannelOption.SO_SNDBUF, BUFFER_SIZE_1MB); + bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, + DEFAULT_HIGH_WATER_MARK)); bootstrap .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } diff --git a/WaarpR66/src/main/java/org/waarp/openr66/database/data/DbTaskRunner.java b/WaarpR66/src/main/java/org/waarp/openr66/database/data/DbTaskRunner.java index 18b6d8cba5..c7be710037 100644 --- a/WaarpR66/src/main/java/org/waarp/openr66/database/data/DbTaskRunner.java +++ b/WaarpR66/src/main/java/org/waarp/openr66/database/data/DbTaskRunner.java @@ -2150,6 +2150,13 @@ public void setRankAtStartup(int rank) { } } + /** + * @param blocksize the block size to set + */ + public void setBlocksize(int blocksize) { + pojo.setBlockSize(blocksize); + } + /** * @param filename the filename to set */ diff --git a/WaarpR66/src/main/java/org/waarp/openr66/protocol/http/restv2/converters/TransferConverter.java b/WaarpR66/src/main/java/org/waarp/openr66/protocol/http/restv2/converters/TransferConverter.java index 237119e3fe..59014f56b3 100644 --- a/WaarpR66/src/main/java/org/waarp/openr66/protocol/http/restv2/converters/TransferConverter.java +++ b/WaarpR66/src/main/java/org/waarp/openr66/protocol/http/restv2/converters/TransferConverter.java @@ -173,7 +173,7 @@ public static Transfer nodeToNewTransfer(ObjectNode object) { new Transfer(null, null, -1, false, null, null, 65536); defaultTransfer.setRequester(serverName()); defaultTransfer.setOwnerRequest(serverName()); - defaultTransfer.setBlockSize(65536); + defaultTransfer.setBlockSize(Configuration.configuration.getBlockSize()); defaultTransfer.setTransferInfo(""); defaultTransfer.setStart(new Timestamp(DateTime.now().getMillis())); final Transfer transfer = parseNode(object, defaultTransfer); diff --git a/WaarpR66/src/main/java/org/waarp/openr66/protocol/localhandler/TransferActions.java b/WaarpR66/src/main/java/org/waarp/openr66/protocol/localhandler/TransferActions.java index 2ab3e7824e..1f00bdef2c 100644 --- a/WaarpR66/src/main/java/org/waarp/openr66/protocol/localhandler/TransferActions.java +++ b/WaarpR66/src/main/java/org/waarp/openr66/protocol/localhandler/TransferActions.java @@ -212,6 +212,12 @@ public void request(RequestPacket packet) } // Receiver can specify a rank different from database setRankAtStartupFromRequest(packet, runner); + runner.setBlocksize(packet.getBlocksize()); + try { + runner.update(); + } catch (WaarpDatabaseException ignored) { + // Ignore + } logger.debug( "Filesize: " + packet.getOriginalSize() + ':' + runner.isSender()); boolean shouldInformBack = false; @@ -287,6 +293,8 @@ private RequestPacket computeBlockSizeFromRequest(RequestPacket packet, } // Check if the blocksize is greater than local value if (Configuration.configuration.getBlockSize() < blocksize) { + logger.warn("Blocksize is greater than allowed {} < {}", + Configuration.configuration.getBlockSize(), blocksize); blocksize = Configuration.configuration.getBlockSize(); final String sep = localChannelReference.getPartner().getSeperator(); packet = new RequestPacket(packet.getRulename(), packet.getMode(), diff --git a/WaarpR66/src/test/java/org/waarp/openr66/protocol/it/ScenarioBase.java b/WaarpR66/src/test/java/org/waarp/openr66/protocol/it/ScenarioBase.java index 49b681c7ec..f21d847506 100644 --- a/WaarpR66/src/test/java/org/waarp/openr66/protocol/it/ScenarioBase.java +++ b/WaarpR66/src/test/java/org/waarp/openr66/protocol/it/ScenarioBase.java @@ -84,6 +84,9 @@ public abstract class ScenarioBase extends TestAbstract { private static final String TMP_R66_CONFIG_R1 = "/tmp/R66/scenario_1_2_3/" + SERVER_1_REWRITTEN_XML; public static int NUMBER_FILES = 50; + public static int LARGE_SIZE = 2000000; + public static int BLOCK_SIZE = 8192; + private static int r66Pid1 = 999999; private static int r66Pid2 = 999999; private static int r66Pid3 = 999999; @@ -368,7 +371,7 @@ public void test011_SendToItself() throws IOException, InterruptedException { Assume.assumeNotNull(networkTransaction); File baseDir = new File("/tmp/R66/scenario_1_2_3/R1/out/"); final File totest = - generateOutFile(baseDir.getAbsolutePath() + "/testTask.txt", 1000); + generateOutFile(baseDir.getAbsolutePath() + "/testTask.txt", 100); final R66Future future = new R66Future(true); final SubmitTransfer transaction = new SubmitTransfer(future, "server1-ssl", "testTask.txt", "rule3", @@ -394,7 +397,8 @@ public void test012_MultipleSendsSync() Assume.assumeNotNull(networkTransaction); File baseDir = new File("/tmp/R66/scenario_1_2_3/R2/out/"); File fileOut = new File(baseDir, "hello"); - final File outHello = generateOutFile(fileOut.getAbsolutePath(), 100000); + final File outHello = + generateOutFile(fileOut.getAbsolutePath(), LARGE_SIZE); ArrayList futures = new ArrayList(NUMBER_FILES); ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_FILES); @@ -406,7 +410,7 @@ public void test012_MultipleSendsSync() final TestRecvThroughClient transaction = new TestRecvThroughClient(future, handler, "server2", "hello", "recvthrough", "Test Multiple RecvThrough", - true, 8192, networkTransaction); + true, BLOCK_SIZE, networkTransaction); transaction.setNormalInfoAsWarn(false); executorService.execute(transaction); } @@ -418,11 +422,13 @@ public void test012_MultipleSendsSync() assertTrue(future.isSuccess()); } long timestop = System.currentTimeMillis(); - logger - .warn("RecvThrough {} files from R2" + " ({} seconds, {} per seconds)", - NUMBER_FILES, (timestop - timestart) / 1000, - NUMBER_FILES * 1000 / (timestop - timestart)); + logger.warn( + "RecvThrough {} files from R2 ({} seconds, {} per seconds) of " + + "size {} with block size {}", NUMBER_FILES, + (timestop - timestart) / 1000, + NUMBER_FILES * 1000 / (timestop - timestart), LARGE_SIZE, BLOCK_SIZE); outHello.delete(); + FileUtils.forceDeleteRecursiveDir(baseDir); logger.warn("End {}", Processes.getCurrentMethodName()); } @@ -511,6 +517,48 @@ public void test04_5000_MultipleSends() Thread.sleep(5000); } + @Test + public void test04_5000_MultipleSends_ChangingBlockSize() + throws IOException, InterruptedException { + Assume.assumeTrue("If the Long term tests are allowed", + SystemPropertyUtil.get(IT_LONG_TEST, false)); + int lastNumber = NUMBER_FILES; + NUMBER_FILES = 800; + BLOCK_SIZE = 16 * 1024; + test012_MultipleSendsSync(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + // Ensure the last send is ok + test011_SendToItself(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + BLOCK_SIZE = 64 * 1024; + test012_MultipleSendsSync(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + // Ensure the last send is ok + test011_SendToItself(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + BLOCK_SIZE = 128 * 1024; + test012_MultipleSendsSync(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + // Ensure the last send is ok + test011_SendToItself(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + BLOCK_SIZE = 512 * 1024; + test012_MultipleSendsSync(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(1000); + // Ensure the last send is ok + test011_SendToItself(); + // Extra sleep to check correctness if necessary on Logs + Thread.sleep(5000); + NUMBER_FILES = lastNumber; + } + private void waitForAllDone(DbTaskRunner runner) { while (true) { try { diff --git a/WaarpR66/src/test/java/org/waarp/openr66/protocol/junit/NetworkClientTest.java b/WaarpR66/src/test/java/org/waarp/openr66/protocol/junit/NetworkClientTest.java index 2b1b794119..0e012e0ae1 100644 --- a/WaarpR66/src/test/java/org/waarp/openr66/protocol/junit/NetworkClientTest.java +++ b/WaarpR66/src/test/java/org/waarp/openr66/protocol/junit/NetworkClientTest.java @@ -704,6 +704,51 @@ public void test5_DirectTransfer() throws Exception { assertEquals("Errors should be 0", 0, error); } + @Test + public void test5_DirectTransferMultipleBlockSize() throws Exception { + final File totest = generateOutFile("/tmp/R66/out/testTask.txt", 1000000); + final ExecutorService executorService = Executors.newCachedThreadPool(); + final int nb = 10; + final R66Future[] arrayFuture = new R66Future[nb]; + logger.warn("Start Test of DirectTransfer"); + final long time1 = System.currentTimeMillis(); + for (int i = 0; i < nb; i++) { + arrayFuture[i] = new R66Future(true); + final TestTransferNoDb transaction = + new TestTransferNoDb(arrayFuture[i], "hostas", "testTask.txt", + "rule3", "Test SendDirect Small", true, + 8192 * (i + 1) * 2, DbConstantR66.ILLEGALVALUE, + networkTransaction); + executorService.execute(transaction); + } + int success = 0; + int error = 0; + for (int i = 0; i < nb; i++) { + arrayFuture[i].awaitOrInterruptible(); + if (arrayFuture[i].getRunner() != null) { + logger.warn("{} {}", arrayFuture[i].getRunner().getBlocksize(), + 8192 * (i + 1) * 2); + assertTrue( + arrayFuture[i].getRunner().getBlocksize() <= 8192 * (i + 1) * 2); + assertTrue(arrayFuture[i].getRunner().getBlocksize() <= + Configuration.configuration.getBlockSize()); + dbTaskRunners.add(arrayFuture[i].getRunner()); + } + if (arrayFuture[i].isSuccess()) { + success++; + } else { + error++; + } + } + final long time2 = System.currentTimeMillis(); + logger.warn("Success: " + success + " Error: " + error + " NB/s: " + + success * 1000 / (time2 - time1)); + executorService.shutdown(); + totest.delete(); + assertEquals("Success should be total", nb, success); + assertEquals("Errors should be 0", 0, error); + } + @Test public void test5_DirectTransferThroughId() throws Exception { final File totest = generateOutFile("/tmp/R66/out/testTask.txt", 10); diff --git a/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1.xml b/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1.xml index 0b5cdcbe14..09a3a419ec 100644 --- a/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1.xml +++ b/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1.xml @@ -85,6 +85,7 @@ True 5000 600 + 1048576 h2 diff --git a/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1_SQLDB.xml b/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1_SQLDB.xml index b31171776b..af37b6e364 100644 --- a/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1_SQLDB.xml +++ b/WaarpR66/src/test/resources/it/scenario_1_2_3/R1/conf/server_1_SQLDB.xml @@ -84,7 +84,8 @@ 5 True 5000 - 600 + 800 + 1048576 XXXDRIVERXXX diff --git a/WaarpR66/src/test/resources/it/scenario_1_2_3/R2/conf/server_2.xml b/WaarpR66/src/test/resources/it/scenario_1_2_3/R2/conf/server_2.xml index 378e8ffaec..f43c3f89e5 100644 --- a/WaarpR66/src/test/resources/it/scenario_1_2_3/R2/conf/server_2.xml +++ b/WaarpR66/src/test/resources/it/scenario_1_2_3/R2/conf/server_2.xml @@ -84,6 +84,7 @@ 5 True 1000 + 1048576 h2 diff --git a/doc/waarp-r66/source/changes.rst b/doc/waarp-r66/source/changes.rst index 1ec1cff84a..17f29b521c 100644 --- a/doc/waarp-r66/source/changes.rst +++ b/doc/waarp-r66/source/changes.rst @@ -34,6 +34,8 @@ Correctifs request [`#42 `__]) - Correction de l'authentification HMAC de l'API REST v2 (pull request [`#43 `__]) +- Correction d'un bug sur la taille des paquets (pull + request [`#45 `__]) Waarp R66 3.3.3 (2020-05-07) ============================