Skip to content

Commit

Permalink
Fix for Block Size
Browse files Browse the repository at this point in the history
In some cases, the given block size could be wrong on receiver side, in particular
when it receives from its own initiative (Pull) a file and when one of the server
has an upper limit below this requested size.

This fix this issue.
  • Loading branch information
fredericBregier committed Jun 2, 2020
1 parent 4d2d9e4 commit 07caa8a
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
Expand All @@ -35,7 +36,11 @@
public final class WaarpNettyUtil {

private static final int TIMEOUT_MILLIS = 1000;
// Default optimal value for Waarp
private static final int BUFFER_SIZE_1MB = 1048576;
// Default optimal value from Netty (tested as correct for Waarp)
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;

private WaarpNettyUtil() {
}
Expand All @@ -57,6 +62,9 @@ public static void setBootstrap(Bootstrap bootstrap, EventLoopGroup group,
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
bootstrap.option(ChannelOption.SO_RCVBUF, BUFFER_SIZE_1MB);
bootstrap.option(ChannelOption.SO_SNDBUF, BUFFER_SIZE_1MB);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK,
DEFAULT_HIGH_WATER_MARK));
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

Expand All @@ -79,6 +87,9 @@ public static void setServerBootstrap(ServerBootstrap bootstrap,
bootstrap.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout);
bootstrap.childOption(ChannelOption.SO_RCVBUF, BUFFER_SIZE_1MB);
bootstrap.childOption(ChannelOption.SO_SNDBUF, BUFFER_SIZE_1MB);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK,
DEFAULT_HIGH_WATER_MARK));
bootstrap
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,13 @@ public void setRankAtStartup(int rank) {
}
}

/**
* @param blocksize the block size to set
*/
public void setBlocksize(int blocksize) {
pojo.setBlockSize(blocksize);
}

/**
* @param filename the filename to set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static Transfer nodeToNewTransfer(ObjectNode object) {
new Transfer(null, null, -1, false, null, null, 65536);
defaultTransfer.setRequester(serverName());
defaultTransfer.setOwnerRequest(serverName());
defaultTransfer.setBlockSize(65536);
defaultTransfer.setBlockSize(Configuration.configuration.getBlockSize());
defaultTransfer.setTransferInfo("");
defaultTransfer.setStart(new Timestamp(DateTime.now().getMillis()));
final Transfer transfer = parseNode(object, defaultTransfer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ public void request(RequestPacket packet)
}
// Receiver can specify a rank different from database
setRankAtStartupFromRequest(packet, runner);
runner.setBlocksize(packet.getBlocksize());
try {
runner.update();
} catch (WaarpDatabaseException ignored) {
// Ignore
}
logger.debug(
"Filesize: " + packet.getOriginalSize() + ':' + runner.isSender());
boolean shouldInformBack = false;
Expand Down Expand Up @@ -287,6 +293,8 @@ private RequestPacket computeBlockSizeFromRequest(RequestPacket packet,
}
// Check if the blocksize is greater than local value
if (Configuration.configuration.getBlockSize() < blocksize) {
logger.warn("Blocksize is greater than allowed {} < {}",
Configuration.configuration.getBlockSize(), blocksize);
blocksize = Configuration.configuration.getBlockSize();
final String sep = localChannelReference.getPartner().getSeperator();
packet = new RequestPacket(packet.getRulename(), packet.getMode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public abstract class ScenarioBase extends TestAbstract {
private static final String TMP_R66_CONFIG_R1 =
"/tmp/R66/scenario_1_2_3/" + SERVER_1_REWRITTEN_XML;
public static int NUMBER_FILES = 50;
public static int LARGE_SIZE = 2000000;
public static int BLOCK_SIZE = 8192;

private static int r66Pid1 = 999999;
private static int r66Pid2 = 999999;
private static int r66Pid3 = 999999;
Expand Down Expand Up @@ -368,7 +371,7 @@ public void test011_SendToItself() throws IOException, InterruptedException {
Assume.assumeNotNull(networkTransaction);
File baseDir = new File("/tmp/R66/scenario_1_2_3/R1/out/");
final File totest =
generateOutFile(baseDir.getAbsolutePath() + "/testTask.txt", 1000);
generateOutFile(baseDir.getAbsolutePath() + "/testTask.txt", 100);
final R66Future future = new R66Future(true);
final SubmitTransfer transaction =
new SubmitTransfer(future, "server1-ssl", "testTask.txt", "rule3",
Expand All @@ -394,7 +397,8 @@ public void test012_MultipleSendsSync()
Assume.assumeNotNull(networkTransaction);
File baseDir = new File("/tmp/R66/scenario_1_2_3/R2/out/");
File fileOut = new File(baseDir, "hello");
final File outHello = generateOutFile(fileOut.getAbsolutePath(), 100000);
final File outHello =
generateOutFile(fileOut.getAbsolutePath(), LARGE_SIZE);
ArrayList<R66Future> futures = new ArrayList<R66Future>(NUMBER_FILES);
ExecutorService executorService =
Executors.newFixedThreadPool(NUMBER_FILES);
Expand All @@ -406,7 +410,7 @@ public void test012_MultipleSendsSync()
final TestRecvThroughClient transaction =
new TestRecvThroughClient(future, handler, "server2", "hello",
"recvthrough", "Test Multiple RecvThrough",
true, 8192, networkTransaction);
true, BLOCK_SIZE, networkTransaction);
transaction.setNormalInfoAsWarn(false);
executorService.execute(transaction);
}
Expand All @@ -418,11 +422,13 @@ public void test012_MultipleSendsSync()
assertTrue(future.isSuccess());
}
long timestop = System.currentTimeMillis();
logger
.warn("RecvThrough {} files from R2" + " ({} seconds, {} per seconds)",
NUMBER_FILES, (timestop - timestart) / 1000,
NUMBER_FILES * 1000 / (timestop - timestart));
logger.warn(
"RecvThrough {} files from R2 ({} seconds, {} per seconds) of " +
"size {} with block size {}", NUMBER_FILES,
(timestop - timestart) / 1000,
NUMBER_FILES * 1000 / (timestop - timestart), LARGE_SIZE, BLOCK_SIZE);
outHello.delete();
FileUtils.forceDeleteRecursiveDir(baseDir);
logger.warn("End {}", Processes.getCurrentMethodName());
}

Expand Down Expand Up @@ -511,6 +517,48 @@ public void test04_5000_MultipleSends()
Thread.sleep(5000);
}

@Test
public void test04_5000_MultipleSends_ChangingBlockSize()
throws IOException, InterruptedException {
Assume.assumeTrue("If the Long term tests are allowed",
SystemPropertyUtil.get(IT_LONG_TEST, false));
int lastNumber = NUMBER_FILES;
NUMBER_FILES = 800;
BLOCK_SIZE = 16 * 1024;
test012_MultipleSendsSync();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
// Ensure the last send is ok
test011_SendToItself();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
BLOCK_SIZE = 64 * 1024;
test012_MultipleSendsSync();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
// Ensure the last send is ok
test011_SendToItself();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
BLOCK_SIZE = 128 * 1024;
test012_MultipleSendsSync();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
// Ensure the last send is ok
test011_SendToItself();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
BLOCK_SIZE = 512 * 1024;
test012_MultipleSendsSync();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(1000);
// Ensure the last send is ok
test011_SendToItself();
// Extra sleep to check correctness if necessary on Logs
Thread.sleep(5000);
NUMBER_FILES = lastNumber;
}

private void waitForAllDone(DbTaskRunner runner) {
while (true) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,51 @@ public void test5_DirectTransfer() throws Exception {
assertEquals("Errors should be 0", 0, error);
}

@Test
public void test5_DirectTransferMultipleBlockSize() throws Exception {
final File totest = generateOutFile("/tmp/R66/out/testTask.txt", 1000000);
final ExecutorService executorService = Executors.newCachedThreadPool();
final int nb = 10;
final R66Future[] arrayFuture = new R66Future[nb];
logger.warn("Start Test of DirectTransfer");
final long time1 = System.currentTimeMillis();
for (int i = 0; i < nb; i++) {
arrayFuture[i] = new R66Future(true);
final TestTransferNoDb transaction =
new TestTransferNoDb(arrayFuture[i], "hostas", "testTask.txt",
"rule3", "Test SendDirect Small", true,
8192 * (i + 1) * 2, DbConstantR66.ILLEGALVALUE,
networkTransaction);
executorService.execute(transaction);
}
int success = 0;
int error = 0;
for (int i = 0; i < nb; i++) {
arrayFuture[i].awaitOrInterruptible();
if (arrayFuture[i].getRunner() != null) {
logger.warn("{} {}", arrayFuture[i].getRunner().getBlocksize(),
8192 * (i + 1) * 2);
assertTrue(
arrayFuture[i].getRunner().getBlocksize() <= 8192 * (i + 1) * 2);
assertTrue(arrayFuture[i].getRunner().getBlocksize() <=
Configuration.configuration.getBlockSize());
dbTaskRunners.add(arrayFuture[i].getRunner());
}
if (arrayFuture[i].isSuccess()) {
success++;
} else {
error++;
}
}
final long time2 = System.currentTimeMillis();
logger.warn("Success: " + success + " Error: " + error + " NB/s: " +
success * 1000 / (time2 - time1));
executorService.shutdown();
totest.delete();
assertEquals("Success should be total", nb, success);
assertEquals("Errors should be 0", 0, error);
}

@Test
public void test5_DirectTransferThroughId() throws Exception {
final File totest = generateOutFile("/tmp/R66/out/testTask.txt", 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<globaldigest>True</globaldigest>
<delaycommand>5000</delaycommand>
<runlimit>600</runlimit>
<blocksize>1048576</blocksize>
</limit>
<db>
<dbdriver>h2</dbdriver>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@
<digest>5</digest>
<globaldigest>True</globaldigest>
<delaycommand>5000</delaycommand>
<runlimit>600</runlimit>
<runlimit>800</runlimit>
<blocksize>1048576</blocksize>
</limit>
<db>
<dbdriver>XXXDRIVERXXX</dbdriver>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<digest>5</digest>
<globaldigest>True</globaldigest>
<runlimit>1000</runlimit>
<blocksize>1048576</blocksize>
</limit>
<db>
<dbdriver>h2</dbdriver>
Expand Down
2 changes: 2 additions & 0 deletions doc/waarp-r66/source/changes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Correctifs
request [`#42 <https://github.com/waarp/Waarp-All/pull/42>`__])
- Correction de l'authentification HMAC de l'API REST v2 (pull
request [`#43 <https://github.com/waarp/Waarp-All/pull/43>`__])
- Correction d'un bug sur la taille des paquets (pull
request [`#45 <https://github.com/waarp/Waarp-All/pull/45>`__])

Waarp R66 3.3.3 (2020-05-07)
============================
Expand Down

0 comments on commit 07caa8a

Please sign in to comment.