Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup redis stream implementation #171

Merged
merged 44 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8e26d7a
cleanup redis stream implementation
Aug 21, 2020
a814d28
stream tests [full ci]
Aug 23, 2020
cf9be34
change timeout handling [full ci]
Aug 23, 2020
d61a978
tune [full ci]
Aug 23, 2020
c277427
tune [full ci]
Aug 23, 2020
15c772e
[full ci]
Aug 23, 2020
5ce72e5
troubleshoot [full ci]
Aug 23, 2020
2b14502
troubleshoot [full ci]
Aug 23, 2020
1ff7231
troubleshoot [full ci]
Aug 23, 2020
9238e7d
troubleshoot [full ci]
Aug 23, 2020
6dd9c66
troubleshoot [full ci]
Aug 23, 2020
a4844f6
troubleshoot [full ci]
Aug 23, 2020
f89d2de
troubleshoot [full ci]
Aug 23, 2020
99deb7e
troubleshoot [full ci]
Aug 23, 2020
e5577e9
troubleshoot [full ci]
Aug 23, 2020
46d232b
troubleshoot [full ci]
Aug 23, 2020
f43acb0
troubleshoot [full ci]
Aug 23, 2020
088b692
troubleshoot [full ci]
Aug 23, 2020
e75f3de
troubleshoot [full ci]
Aug 23, 2020
a416e64
troubleshoot [full ci]
Aug 23, 2020
840941b
troubleshoot [full ci]
Aug 23, 2020
6479d2f
troubleshoot [full ci]
Aug 23, 2020
4c8eb8b
troubleshoot [full ci]
Aug 23, 2020
02a1376
troubleshoot [full ci]
Aug 23, 2020
10e72f3
troubleshoot [full ci]
Aug 23, 2020
ffb82f5
troubleshoot [full ci]
Aug 23, 2020
2380306
troubleshoot [full ci]
Aug 23, 2020
cd05545
troubleshoot [full ci]
Aug 23, 2020
bd7375b
troubleshoot [full ci]
Aug 23, 2020
fdff9c0
troubleshoot [full ci]
Aug 23, 2020
dc24243
troubleshoot [full ci[
Aug 23, 2020
f516173
troubleshoot [full ci]
Aug 23, 2020
595c865
troubleshoot [full ci]
Aug 23, 2020
084f529
troubleshoot [full ci]
Aug 23, 2020
5ac26e7
troubleshoot [full ci]
Aug 23, 2020
9611681
troubleshoot [full ci]
Aug 23, 2020
4508000
troubleshoot [full ci]
Aug 24, 2020
f2cfb21
cleanup [full ci]
Aug 24, 2020
151e6e6
update docs
Aug 24, 2020
28fea87
fix refactor
Aug 24, 2020
0d9577f
bump version
Aug 24, 2020
bdc46fc
fix array push
Aug 24, 2020
53a7123
style
Aug 24, 2020
3da90ad
style
Aug 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 32 additions & 22 deletions docs/docs/Module_Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,26 @@ Application.create()
| Name | Description |
|------|-------------|
| host | the HTTP endpoint to connect to |
| port | the port to connect to. Default is 6379 |
| db | index of the database to connect to |
| _port_ | the port to connect to. Default is 6379 |
| _db_ | index of the database to connect to. Default is 0 |
| _password_ | the password to use to connect to Redis. Default is no password |
| _encoder_ | the encoder to use when converting the payload to a byte array. This defaults to the `NullMessageEncoder` which only supports Buffers (=byte arrays) being published |
| _typeMapper_ | only required if correct type information needs to be emitted |
| _base64Encode_ | determines if buffers should be stored in base64 encoding. Default is true |

### Metrics

| Name | Description | Type | Tags |
| ------------------------------------------- | ----------- | ---- | ---- |
| cookie_cutter.redis_client.get | A call to get a value | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.set | A call to set a value | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xadd | A call to add a value to a stream | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xread | A call to read from a stream | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xreadgroup | A call to read from a a stream as part of a consumer group | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xgroup | A call to create a consumer group | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xack | A call to acknowledge a message in a stream | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xpending | A call to query pending messages list of a stream | `increment` | `type`, `db`, `result`
| cookie_cutter.redis_client.xclaim | A call to claim a pending message of a stream | `increment` | `type`, `db`, `result`

## redisStreamSource

Expand All @@ -54,11 +63,9 @@ The `redisStreamSource` function creates a new input source that receives messag
Application.create()
.input()
.add(redisStreamSource({
host: "localhost",
port: 6379,
db: 0,
readStreams: ["stream1", "stream2", "stream3"],
consumerGroup: "consumer-group-1",
host: "localhost",
streams: ["stream1", "stream2", "stream3"],
consumerGroup: "consumer-group-1",
}))
.done()
// ...
Expand All @@ -70,26 +77,30 @@ The `redisStreamSource` function creates a new input source that receives messag
| Name | Description |
|------|-------------|
| host | the HTTP endpoint to connect to |
| port | the port to connect to. Default is 6379 |
| db | index of the database to connect to |
| _encoder_ | the encoder to use when converting the payload to a byte array. This defaults to the `NullMessageEncoder` which only supports Buffers (=byte arrays) being published |
| _port_ | the port to connect to. Default is 6379 |
| _db_ | index of the database to connect to |
| _password_ | the password to use to connect to Redis. Default is no password |
| _typeMapper_ | only required if correct type information needs to be emitted |
| readStreams | a list of stream names to consume from |
| encoder | the encoder to use when converting the payload to a byte array. |
| streams | a list of stream names to consume from |
| consumerGroup | the name of the consumer group to join |
| _consumerId_ | the id of consumer to use, the default value is a generated guid |
| _consumerGroupStartId_ | the ID of the last item in the stream to consider already delivered, the default value is `$` (the ID of the last item in the stream) |
| _blockTimeout_ | the number of milliseconds we want to block before timing out, the default values is 100 ms |
| _idleTimeout_ | the minimum number of milliseconds of idle time a pending message should have before we try to claim it, the default value is 30000 ms (30s) |
| _batchSize_ | the number of messages receive at a time when consuming streams, the default values is 10 |
| _reclaimMessageInterval_ | defines how often a client is checking for pending messages from dead consumers and tries to reclaim then |
| _base64Encode_ | determines if buffers should be stored in base64 encoding. Default is true |

### Metadata

The following metadata is available in the message handler via `ctx.metadata<T>(key)`

| name | description |
|--------------------------|-------------|
| RedisMetadata.StreamName | the name of the stream the messages was received from |
| RedisMetadata.StreamId | the stream id of the message that was received |
| RedisMetadata.Stream | the name of the stream the messages was received from |
| RedisMetadata.MessageId | the id of the message that was received |
| RedisMetadata.ConsumerId | the id of the consumer group |

### Metrics

Expand All @@ -109,10 +120,8 @@ The 'redisStreamSink' function creates an output sink that handles published mes
Application.create()
.output()
.published(redisStreamSink({
host: "localhost",
port: 6379,
db: 0,
writeStream: "streamName",
host: "localhost",
stream: "streamName",
}))
.done()
// ...
Expand All @@ -123,19 +132,20 @@ Application.create()
| Name | Description |
|------|-------------|
| host | the HTTP endpoint to connect to |
| port | the port to connect to. Default is 6379 |
| db | index of the database to connect to |
| _encoder_ | the encoder to use when converting the payload to a byte array. This defaults to the `NullMessageEncoder` which only supports Buffers (=byte arrays) being published |
| _port_ | the port to connect to. Default is 6379 |
| _db_ | index of the database to connect to |
| encoder | the encoder to use when converting the payload to a byte array. |
| _typeMapper_ | only required if correct type information needs to be emitted |
| writeStream | the name of the stream to publish to if no other stream name was specified in the message handler |
| stream | the name of the stream to publish to if no other stream name was specified in the message handler |
| maxStreamLength | if defined will limit the length of a stream by truncating it when new messages are published. Default is off |

### Metadata

The following metadata is available in the message handler via `ctx.publish`

| name | description |
|--------------------------|-------------|
| RedisMetadata.StreamName | the name of the stream to publish to |
| RedisStreamMetadata.Stream | the name of the stream to publish to |

### Metrics

Expand Down
4 changes: 2 additions & 2 deletions packages/redis/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@walmartlabs/cookie-cutter-redis",
"version": "1.3.0-beta.9",
"version": "1.3.0-beta.10",
"license": "Apache-2.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -20,7 +20,7 @@
"@walmartlabs/cookie-cutter-core": "^1.3.0-beta"
},
"devDependencies": {
"@types/redis": "2.8.16",
"@types/redis": "2.8.25",
"@walmartlabs/cookie-cutter-core": "^1.3.0-beta"
},
"scripts": {
Expand Down
Loading