Skip to content

Commit

Permalink
fix rocketmq message header properties garbled characters issue (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
HScarb authored Oct 27, 2021
1 parent 3f93954 commit 30fb81a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Release Notes.
* Fix instrumentation v2 API doesn't work for constructor instrumentation.
* Add plugin to support okhttp 2.x
* Optimize okhttp 3.x 4.x plugin to get span time cost precisely
* Adapt message header properties of RocketMQ 4.9.x

#### Documentation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
while (next.hasNext()) {
next = next.next();
if (!StringUtil.isEmpty(next.getHeadValue())) {
if (properties.length() > 0 && properties.charAt(properties.length() - 1) != PROPERTY_SEPARATOR) {
// adapt for RocketMQ 4.9.x or later
properties.append(PROPERTY_SEPARATOR);
}
properties.append(next.getHeadKey());
properties.append(NAME_VALUE_SEPARATOR);
properties.append(next.getHeadValue());
properties.append(PROPERTY_SEPARATOR);
}
}
requestHeader.setProperties(properties.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.skywalking.apm.plugin.rocketMQ.v4;

import java.util.List;
import java.util.Map;

import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.skywalking.apm.agent.core.context.SW8ExtensionCarrierItem;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -44,10 +48,17 @@

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.when;

import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;

@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class MessageSendInterceptorTest {
Expand Down Expand Up @@ -108,15 +119,22 @@ public void setSkyWalkingDynamicField(Object value) {
CommunicationMode.ASYNC,
null
};
when(messageRequestHeader.getProperties()).thenReturn("");
when(message.getTags()).thenReturn("TagA");
stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA" + PROPERTY_SEPARATOR);
}

@Test
public void testSendMessage() throws Throwable {
messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);

Map<String, String> tags = MessageDecoder.string2messageProperties(
((SendMessageRequestHeader) arguments[3]).getProperties());
// check original header of TAGS
assertThat(tags.get("TAGS"), is("TagA"));
// check skywalking header
assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME));

assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
Expand All @@ -127,15 +145,27 @@ public void testSendMessage() throws Throwable {
SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ_PRODUCER);
SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
verify(messageRequestHeader).setProperties(anyString());
verify(callBack).setSkyWalkingDynamicField(Matchers.any());
}

@Test
public void testSendMessageNew() throws Throwable {
stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA");
testSendMessage();
}

@Test
public void testSendMessageWithoutCallBack() throws Throwable {
messageSendInterceptor.beforeMethod(enhancedInstance, null, argumentsWithoutCallback, null, null);
messageSendInterceptor.afterMethod(enhancedInstance, null, argumentsWithoutCallback, null, null);

Map<String, String> tags = MessageDecoder.string2messageProperties(
((SendMessageRequestHeader) argumentsWithoutCallback[3]).getProperties());
// check original header of TAGS
assertThat(tags.get("TAGS"), is("TagA"));
// check skywalking header
assertTrue(tags.containsKey(SW8ExtensionCarrierItem.HEADER_NAME));

assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
Expand All @@ -146,7 +176,25 @@ public void testSendMessageWithoutCallBack() throws Throwable {
SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ_PRODUCER);
SpanAssert.assertTag(mqSpan, 0, "127.0.0.1");
verify(messageRequestHeader).setProperties(anyString());
}

@Test
public void testSendMessageWithoutCallBackNew() throws Throwable {
stubMessageRequestHeader("TAGS" + NAME_VALUE_SEPARATOR + "TagA");
testSendMessageWithoutCallBack();
}

private void stubMessageRequestHeader(String properties) {
messageRequestHeader = mock(SendMessageRequestHeader.class, RETURNS_DEEP_STUBS);
doAnswer(invocation -> {
String val = (String) invocation.getArguments()[0];
when(messageRequestHeader.getProperties()).thenReturn(val);
return null;
}).when(messageRequestHeader).setProperties(anyString());
when(messageRequestHeader.getProperties()).thenCallRealMethod();
messageRequestHeader.setProperties(properties);

arguments[3] = messageRequestHeader;
argumentsWithoutCallback[3] = messageRequestHeader;
}
}

0 comments on commit 30fb81a

Please sign in to comment.