Skip to content

Commit

Permalink
Added CRC32 check for full data
Browse files Browse the repository at this point in the history
  • Loading branch information
guyinyou committed Oct 10, 2023
1 parent 38d3d5d commit 7c0da89
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 15 deletions.
14 changes: 14 additions & 0 deletions common/src/main/java/org/apache/rocketmq/common/UtilAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,20 @@ public static int crc32(byte[] array, int offset, int length) {
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

public static int crc32(ByteBuffer byteBuffer) {
CRC32 crc32 = new CRC32();
crc32.update(byteBuffer);
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

public static int crc32(ByteBuffer[] byteBuffers) {
CRC32 crc32 = new CRC32();
for (ByteBuffer buffer : byteBuffers) {
crc32.update(buffer);
}
return (int) (crc32.getValue() & 0x7FFFFFFF);
}

public static String bytes2string(byte[] src) {
char[] hexChars = new char[src.length * 2];
for (int j = 0; j < src.length; j++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class MessageConst {
public static final String PROPERTY_TIMER_DEL_UNIQKEY = "TIMER_DEL_UNIQKEY";
public static final String PROPERTY_TIMER_DELAY_LEVEL = "TIMER_DELAY_LEVEL";
public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
public static final String PROPERTY_CRC32 = "__CRC32#";

/**
* properties for DLQ
Expand Down Expand Up @@ -155,5 +156,6 @@ public class MessageConst {
STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
STRING_HASH_SET.add(PROPERTY_CRC32);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.common.message;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
Expand Down Expand Up @@ -152,6 +153,34 @@ public static Map<String, String> decodeProperties(ByteBuffer byteBuffer) {
return null;
}

public static void createCrc32(final ByteBuffer input, int crc32) {
input.put(MessageConst.PROPERTY_CRC32.getBytes());
input.put((byte) NAME_VALUE_SEPARATOR);
for (int i = 0; i < 10; i++) {
byte b = '0';
if (crc32 > 0) {
b += (byte) (crc32 % 10);
crc32 /= 10;
}
input.put(b);
}
input.put((byte) PROPERTY_SEPARATOR);
}

public static void createCrc32(final ByteBuf input, int crc32) {
input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes());
input.writeByte((byte) NAME_VALUE_SEPARATOR);
for (int i = 0; i < 10; i++) {
byte b = '0';
if (crc32 > 0) {
b += (byte) (crc32 % 10);
crc32 /= 10;
}
input.writeByte(b);
}
input.writeByte((byte) PROPERTY_SEPARATOR);
}

public static MessageExt decode(ByteBuffer byteBuffer) {
return decode(byteBuffer, true, true, false);
}
Expand Down Expand Up @@ -601,9 +630,6 @@ public static String messageProperties2String(Map<String, String> properties) {
sb.append(value);
sb.append(PROPERTY_SEPARATOR);
}
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.rocketmq.common.TopicFilterType;

import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;

public class MessageExtBrokerInner extends MessageExt {
private static final long serialVersionUID = 7256001576878700634L;
private String propertiesString;
Expand Down Expand Up @@ -55,6 +58,52 @@ public void setPropertiesString(String propertiesString) {
this.propertiesString = propertiesString;
}


public void deleteProperty(String name) {
super.clearProperty(name);
if (propertiesString != null) {
int idx0 = 0;
int idx1;
int idx2;
idx1 = propertiesString.indexOf(name, idx0);
if (idx1 != -1) {
// cropping may be required
StringBuilder stringBuilder = new StringBuilder(propertiesString.length());
while (true) {
int startIdx = idx0;
while (true) {
idx1 = propertiesString.indexOf(name, startIdx);
if (idx1 == -1) {
break;
}
startIdx = idx1 + name.length();
if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == PROPERTY_SEPARATOR) {
if (propertiesString.length() > idx1 + name.length()
&& propertiesString.charAt(idx1 + name.length()) == NAME_VALUE_SEPARATOR) {
break;
}
}
}
if (idx1 == -1) {
// there are no characters that need to be skipped. Append all remaining characters.
stringBuilder.append(propertiesString, idx0, propertiesString.length());
break;
}
// there are characters that need to be cropped
stringBuilder.append(propertiesString, idx0, idx1);
// move idx2 to the end of the cropped character
idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + name.length() + 1);
// all subsequent characters will be cropped
if (idx2 == -1) {
break;
}
idx0 = idx2 + 1;
}
this.setPropertiesString(stringBuilder.toString());
}
}
}

public long getTagsCode() {
return tagsCode;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.apache.rocketmq.common;

import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class MessageExtBrokerInnerTest {
@Test
public void testDeleteProperty() {
MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
String propertiesString = "";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");

propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");

propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");

propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");

propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
messageExtBrokerInner.setPropertiesString(propertiesString);
messageExtBrokerInner.deleteProperty("KeyA");
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
}
}
69 changes: 64 additions & 5 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,16 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
byteBuffer.get(bytesContent, 0, bodyLen);

if (checkCRC) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
/**
* When the forceVerifyPropCRC = false,
* use original bodyCrc validation.
*/
if (!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
if (crc != bodyCRC) {
log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);
return new DispatchRequest(-1, false/* success */);
}
}
}
} else {
Expand Down Expand Up @@ -527,6 +533,43 @@ public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
}
}

if (checkCRC) {
/**
* When the forceVerifyPropCRC = true,
* Crc verification needs to be performed on the entire message data (excluding the length reserved at the tail)
*/
if (this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
int expectedCRC = -1;
if (propertiesMap != null) {
String crc32Str = propertiesMap.get(MessageConst.PROPERTY_CRC32);
if (crc32Str != null) {
expectedCRC = 0;
for (int i = crc32Str.length() - 1; i >= 0; i--) {
int num = crc32Str.charAt(i) - '0';
expectedCRC *= 10;
expectedCRC += num;
}
}
}
if (expectedCRC > 0) {
ByteBuffer tmpBuffer = byteBuffer.duplicate();
tmpBuffer.position(tmpBuffer.position() - totalSize);
tmpBuffer.limit(tmpBuffer.position() + totalSize - this.defaultMessageStore.getMessageStoreConfig().getCrc32ReservedLength());
int crc = UtilAll.crc32(tmpBuffer);
if (crc != expectedCRC) {
log.warn(
"CommitLog#checkAndDispatchMessage: failed to check message CRC, expected "
+ "CRC={}, actual CRC={}", bodyCRC, crc);
return new DispatchRequest(-1, false/* success */);
}
} else {
log.warn(
"CommitLog#checkAndDispatchMessage: failed to check message CRC, not found CRC in properties");
return new DispatchRequest(-1, false/* success */);
}
}
}

int readLength = MessageExtEncoder.calMsgLength(messageVersion, sysFlag, bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
Expand Down Expand Up @@ -826,9 +869,10 @@ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBroke
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(System.currentTimeMillis());
}

// Set the message body CRC (consider the most appropriate setting on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// delete crc32 properties if exist
msg.deleteProperty(MessageConst.PROPERTY_CRC32);
// Back to Results
AppendMessageResult result = null;

Expand Down Expand Up @@ -1742,6 +1786,7 @@ class DefaultAppendMessageCallback implements AppendMessageCallback {
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
// Store the message content
private final ByteBuffer msgStoreItemMemory;
private final int crc32ReservedLength = MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;

DefaultAppendMessageCallback() {
this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
Expand Down Expand Up @@ -1815,6 +1860,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer
pos += 8 + 4 + 8 + ipLen;
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
// 18 CRC32
int checkSize = msgLen - crc32ReservedLength;
ByteBuffer tmpBuffer = preEncodeBuffer.duplicate();
tmpBuffer.limit(tmpBuffer.position() + checkSize);
int crc32 = UtilAll.crc32(tmpBuffer);
tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength);
MessageDecoder.createCrc32(tmpBuffer, crc32);

final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
Expand Down Expand Up @@ -1896,6 +1948,13 @@ public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer
pos += 8 + 4 + 8 + bornHostLength;
// refresh store time stamp in lock
messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp());
//append crc32
int checkSize = msgLen - crc32ReservedLength;
ByteBuffer tmpBuffer = messagesByteBuff.duplicate();
tmpBuffer.position(msgPos).limit(msgPos + checkSize);
int crc32 = UtilAll.crc32(tmpBuffer);
messagesByteBuff.position(msgPos + checkSize);
MessageDecoder.createCrc32(messagesByteBuff, crc32);

putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen;
queueOffset++;
Expand Down
Loading

0 comments on commit 7c0da89

Please sign in to comment.