Custom Stages in Redis Connect is used when there is a need for custom coding for the purpose of user specific transformations, de-tokenization or any other custom tasks you would want to do before the source data is passed along to the final WRITE stage and persisted in the Redis Enterprise database. Redis Connect is an event driven workflow built using stage driven architecture which create the pipeline for any data flow from a source to Redis Enterprise target. The stages can be built-in stages such as REDIS_HASH_SINK
or a Custom Stage first e.g. TO_UPPER_CASE
then apply the final write stage e.g. REDIS_HASH_SINK
. This demo will explain on how to write a very simple custom transformation that converts the input source records to an UPPER CASE value and pass it along to the final write stage.
connect-core
maven module must be available as a dependency prior to writing the Custom Stage class. Please see the POM for an example.
Create a Custom Stage class e.g. TransformValueToUpperCaseStage which extends the BaseCustomStageHandler class.
We must override the following method in order to write the custom stage.
onEvent(ChangeEventDTO changeEvent)
Create a CustomChangeEventHandlerFactory class which implements the ChangeEventHandlerFactory interface and copy this custom factory class to META-INF/services folder that matches the package name in ChangeEventHandlerFactory service configuration.
The Service Loader will pick up this custom factory class during runtime by ChangeEventHandlerFactory.
We must instantiate the CustomStage class within the getInstance() method e.g.
changeEventHandler = new ToUpperCase(jobId, jobType, jobPipelineStage);
Build the project and place the output jar into the extlib folder of Redis Connect. See an example here
Create the custom stage configuration in the job payload e.g. cdc-custom-job.json
For example,
{
"index": 1,
"stageName": "TO_UPPER_CASE",
"userDefinedType": "CUSTOM"
}
After Redis Connect job (loader
or cdc
) execution, you should see that the value of col1 and col2 in Redis has been transformed to UPPER CASE values.
Troubleshooting using Java Debug Wire Protocol (JDWP)
-
Start Redis Connect after supplying
jdwp
as JVM options- *nixOS
- Edit
~/redis-connect/bin/redisconnect.conf
and updateREDISCONNECT_JAVA_OPTIONS
REDISCONNECT_JAVA_OPTIONS="-XX:+HeapDumpOnOutOfMemoryError -Xms1g -Xmx2g -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"
- Edit
- Windows OS
- Edit
redis-connect\bin\redisconnect.cmd
and updateREDISCONNECT_JAVA_OPTIONS
set REDISCONNECT_JAVA_OPTIONS=-XX:+HeapDumpOnOutOfMemoryError -Xms1g -Xmx2g -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
- Edit
- Docker
- Pass
REDISCONNECT_JAVA_OPTIONS
as environment variable
-e REDISCONNECT_JAVA_OPTIONS="-Xms1g -Xmx2g -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"
- Pass
- *nixOS
-
Start Custom Stage client code in
DEBUG
mode with sameHOST
andPORT
then, add a breakpoint. See an example DEBUG configuration