Skip to content

Commit

Permalink
Stabilize 1.3 (#202)
Browse files Browse the repository at this point in the history
* parent 598250b
author sklose <[email protected]> 1574703790 -0500
committer sklose <[email protected]> 1579188453 -0500

parent 598250b
author sklose <[email protected]> 1574703790 -0500
committer sklose <[email protected]> 1579188394 -0500

initial commit develop

* Bump up the versions for google cloud NPM packages (#11)

* Integration command exits with jest exit code (#40)

* Integration command exits with jest exit code [full ci]

* revert version bumps [full ci]

* Fix failing Kafka integration test (#43)

* Remove Deprecated new Buffer usage (#46)

* #12 remove deprecated new Buffer usage

* Support Changes in Metrics Tags for Prometheus (#38)

* Support Changes in Metrics Tags for Prometheus

* add integration test to .travis.yml [full ci]

* set jestTimeout to a larger value [full ci]

* testing change to integrate always returning 0

* [full ci]

* increasing time to wait for Prometheus to scrape

* [full ci]

* increase wait to 120 sec [full ci]

* adding localhost so Docker/Prom works for linux

* [full ci]

* look for ports starting with 300 [full ci]

* show all results from netstat [full ci]

* File service discovery for Prometheus targets [full ci]

* moving targets.json generation to integration file

* [full ci]

* [full ci]

* slimming down docker-compose.yml file

* [full ci]

* addressing comments

* [full ci]

* Clean up for review

* Fix IMetricsTags to ILabelValues conversion

* Addressing comments

* kubernetes: adjust logic for creating a watch (#48)

kubernetes: adjust logic for creating a watch
* update kubernetes-client to latest version on master that contains
various internal fixes for creation and handling of a watch
* adjust logic for startWatch to wrap watch function inside of a promise
and handle timeouts differently
* change callback for watch to not be async 

azure: fix build error to take in correct BufferEncoding and update kb calculations

* Revert commit of incorrect yarn.lock from #48 (#50)

yarn.lock was committed with private registry information. rebuild
yarn.lock in order to fix deployment issues from #48.

* kubernetes: fix kubernetes-client package build issue with Node 8 (#51)

kubernetes-client package requires use of node version greater than 8
but we require 8 in the travis ci build pipeline. as a temporary solution
till we deprecate Node 8 completely as part of #52 update the build pipeline
to use a separate package.v8.json which doesn't build it.

* build: fix grep check to only match if we're using node 8 (#53)

* kubernetes: update @kubernetes/client-node package to 0.11.1 (#54)

* kubernetes: add in additional logging details for watch errors (#56)

* improve timeout of k8s watch (#59)

* Allow opt-in to creating Azure queues dynamically on write (#55)

* core: update ConsoleLogger to correctly log out nested objects (#60)

core: update ConsoleLogger to correctly log out nested objects

The console logger renders out `[object Object]` if a field is an object
so destructure it into `.` delimited nested fields (e.g. foo.bar.baz)
* Update ConsoleLogger to take in options letting users select a maxDepth for log outputs

* kubernetes: create k8sPollSource that periodically polls for resources instead of using a watch (#61)

* Create `KubernetesBase` class that both poll and watch sources extend from
* Add in tests for `KubernetesPollSource`

* Add preprocessor concept for Azure Queues to support messages not following the envelope format (#65)

* fix missing bind when creating new azure queues (#63)

* fix lint errors (#67)

* refactor queue client to avoid exporting type from azure library as p… (#68)

* refactor queue client to avoid exporting type from azure library as part of public api

* kubernetes: add in timeout to poll request in case request to k8s hangs (#70)

* Fix bug when options are ignored in QueueClient (#69)

* Decorator returning class with empty string name (#73)

* Decorator returning class with empty string name
[full cli]

* remove export, add link to TS issue [full cli]

* bump version [full ci]

* core: do not overwrite pending promises for the backlog of queued items (#76)

It's possible to get into a state where we've hit our limit on items
that we can add to the bounded priority queue and a large backlog of
items are accumulated. Once we can begin to process that backlog after
a `whenNotFull` promise resolves, the first item in the backlog will
create a new `whenNotFull` and proceed to await it. Subsequent items in
backlog queue will do the same but overwrite the promise of the first
item leading to a chain of promises for backlogged items that are
unresolvable. This PR ensures that we never overwrite the
`whenNotFull` promise for backlogged queue items and we can resolve
them all eventually.

* core: version bump to 1.2.0-beta.11 (#77)

* Update Cookie Cutter Dependencies (#75)

* better throughput in RPC mode while guaranteeing correctness of state (#83)

* Update dependencies for Node 8 (#86)

* Update dependencies for Node 8

* add white space [full ci]

* remove white space [full ci]

* A few more deps updates [full ci]

Co-authored-by: Plamen Ivanov <[email protected]>

* Reveal Azure Blob & Queue Service URL params, to allow for pointing at a local emulator.  (#89)

* Way to Inspect 'Invalid' Messages (#82)

* Way to Inspect 'Invalid' Messages

* unit test

* remove annotator from unit test

* allow publishing from inside the invalid handler

* Updating docs

* impoving clarity of doc entry

* actual change of doc

* add failSpan if input validation fails for Serial

* Use custom error to signal no invalid msg handler

* Missed files

* Do not propagate NoInvalidHandlerError

* Remove custom error and add hasInvalid function

* add case in unit test

* more tests for ConventionBasedMessageDispatcher

* refactoring to simplify code

* addressing comments

Co-authored-by: Plamen Ivanov <[email protected]>

* Better Log Message when a Message Fails to Process and fix to upsertSproc SeqConErr details (#91)

* Better Log Message when a Message Fails to Process

* upsertSproc SeqConErr details fix

* use context's logger

Co-authored-by: Plamen Ivanov <[email protected]>

* Update CHANGELOG.md (#98)

* bump develop to 1.3 (#106)

* Add Dead Letter Queue to QueueInputSource (#93)

* Add Dead Letter Queue to QueueInputSource

* Addressing comments

* pass in a modified config to dead letter queue

* addressing comment

* changing API to expect values in milliseconds

* Updating docs

* Docs update with example dead letter queue config

* update package.jsoon

* doc nit

Co-authored-by: Plamen Ivanov <[email protected]>

* merge release/1.2 into develop (#112)

* upgrade dependencies due to vulnerabilities (#111)

* Remove support for node 8 (#113)

* core: unable to close bounded priority queue (#116)

core: fix issue with bounded priority queue not correctly supporting closing of queues
immediately after it was drained leading to potential Cannot read property 'resolve' of undefined errors due to whenNotFull being
undefined.

* ConcurrentMessageProcessor suppresses error details (#124)

* ConcurrentMessageProcessor suppresses error details

When message handling fails outside of the message handler the ConcurrentMessageHandler currently throws a generic Error that hides the underlying root cause error. This PR changes it to re-throw the original error, similar to what the SerialMessageProcessor is doing.

* Fix breaking API change in Azure Queues (#122)

* Fix breaking API change in Azure Queues (#121)

* Adding source unit to Config's timespanOf function

* Adding Azure Queue change and other fixes

Co-authored-by: Plamen Ivanov <[email protected]>

* DeadLetterQueue fixes

* bump versions

* rebase and bump version

Co-authored-by: Plamen Ivanov <[email protected]>

* Add RedisStreamSink & RedisStreamSource (#126)

* Add GCP PubSub Sink (#125)

* Bump Redis version to publish new package (#127)

do version bump missing in PR #126

* prometheus module should not throw an error when incrementing by 0 (#130)

merge back from master

* Prevent config.parse() output from being used as input (#134)

* multi cosmos collections (#81) (#117)

* fix MsSqlSink throws wrong error (#140)

* fix MsSqlSink throws wrong error

* Update package.json

* Update MssqlSink.ts

* Add AMQP Sink + Source (#136)

* Add AMQP Sink + Source

* add the actual Sink/Source files

* properly close the sink's connection

* Adding initialization and disposal to source

* Basic producer and consumer scripts.

* replace AsyncPipe with BoundedPriorityQueue

* Refactor to get correct produce/consume behavior

* improve connection call and add port as optional

* fix yaml.lock file

* Adding integration test

* add new line at end of yml file

* Add Copyright text and set "--passWithNoTest"

* add msg release listener [full ci]

* add AMQP integration to travis build

* fix .travis.yml [full ci]

* addressing comments

* Fix integration test/setup [full ci]

* Adding tracing [full ci]

* minor corrections

Co-authored-by: Plamen Ivanov <[email protected]>

* Add metrics to AMQP (#144)

* Add metrics to AMQP

* bump version

* switch to this.channel

* trigger [full ci]

* Adding periodic metrics

* Add metadata [full ci]

* Lint and style fix [full ci]

* Addressing comments

Co-authored-by: Plamen Ivanov <[email protected]>

* Add docs to AMQP package (#147)

* Add docs to AMQP package

* correcting module name

* Addressing comments

* Adding example files

* minor nits

* rename

* Simplify config

Co-authored-by: Plamen Ivanov <[email protected]>

* Add cookie-cutter-jaeger (#151)

* Add cookie-cutter-jaeger

* remove interface

* add lock

* update per comments

Co-authored-by: Marco Garcia <[email protected]>

* cookie-cutter-redis: add support for multiple streams (#155)

* cc-redis: add support for multiple streams

* fix test

* update spanLogAndSetTags

* update getPendingMessagesForConsumerGroup

* update RedisStreamSink

* rename test

* add tests

* add more tests

* update per feedback

* update xReadGroup

* full ci

Co-authored-by: Marco Garcia <[email protected]>

* cookie-cutter-redis: add metrics (#156)

* cookie-cutter-redis: add metrics

* full ci

* update docs

* update docs

* full ci

* remove array tag

Co-authored-by: Marco Garcia <[email protected]>

* Have separate queue capacity per priority level (#160)

* Have separate queue capacity per priority level

* Fix memory leak likely caused by promise chaining

* fixing floating promises

* white space change [full ci]

Co-authored-by: Plamen Ivanov <[email protected]>
Co-authored-by: Sebastian Klose <[email protected]>

* Fix lz4 error by adding resolution (#161)

* Fix lz4 error by adding resolution

* yarn.lock file change

Co-authored-by: Plamen Ivanov <[email protected]>

* Add new Jaeger package to README (#158)

* fix broken metrics, reclaim PEL messages less often (#163)

* fix pending list not fully drained on startup, added password config (#164)

* fix messages are acked on error (#168)

* check for failed acks to redis (#169)

* fix xReadGroup ignores all but first message from batch (#170)

* Implement IEncodedMessageEmbedder for ProtoMessageEncoder (#174)

* Implement IEncodedMessageEmbedder for
 ProtoMessageEncoder

* version change

* unit tests

Co-authored-by: Plamen Ivanov <[email protected]>

* cleanup redis stream implementation (#171)

* make some redis options nullable so the default value can be overwrit… (#177)

* troubleshoot redis stream issue (#179)

* Allow negative values in Prometheus histogram (#181)

Co-authored-by: Plamen Ivanov <[email protected]>

* Prevent BoundedPriorityQueue from deprioritizing waiting enqueue calls (#180)

* Prevent BoundedPriorityQueue from deprioritizing
waiting enqueue calls

* versioon bump

* handle floating promises

Co-authored-by: Plamen Ivanov <[email protected]>

* Fix backwards compatibility for ProtoMessageEncoder (#182)

* Version bump for Proto change (#183)

Co-authored-by: Plamen Ivanov <[email protected]>

* Change LogLevel from Error -> Warn when retrieving Kafka watermarks (#184)

* fix 'yarn audit' issues [full ci] (#187)

* fix sec vuln in node-fetch (#189)

* Add auth for amqp source and sink (#192)

* Add auth for amqp source and sink

* trigger [full ci]

* do not overwrite default creds [full ci]

* do not overwrite default creds [full ci]

* do not overwrite default creds [full ci]

* do not overwrite default creds [full ci]

* add missing new line [full ci]

* lint fix

* trigger ci [full ci]

* trigger ci

Co-authored-by: [email protected] <[email protected]>

* Add AMQP package to README (#193)

Co-authored-by: Plamen Ivanov <[email protected]>

* Add vhost support for amqp (#194)

* Add vhost support for amqp

* Add vhost support for amqp

* Add vhost support for amqp [full ci]

* Add vhost support for amqp

* Add vhost support for amqp

* Add vhost support for amqp

* version bump

Co-authored-by: [email protected] <[email protected]>

* kubernetes: adjust logging to be less verbose (#195)

* detect when kafkajs is stuck with stale broker metadata (#186)

* detect when kafkajs is stuck with stale broker metadata and terminate application

* lint

* Ensure RedisClient's "type" metric label is always a string (#196)

* Ensure metrics type label is always a string

* Bump cookie-cutter-redis version to 1.3.0-beta.13

* Fix wrong string function in Kafka (#197)

* Fix wrong string function in Kafka

* proper conversion of object to string

Co-authored-by: Plamen Ivanov <[email protected]>

* address vulnerability in node-forge package [full ci] (#200)

* create 1.3-rc [full ci]

* bump version, add missing license headers

* fix code dupe

* update changelog

Co-authored-by: Kshitiz Gupta <[email protected]>
Co-authored-by: plameniv <[email protected]>
Co-authored-by: Connor Ross <[email protected]>
Co-authored-by: Tanvir Alam <[email protected]>
Co-authored-by: Chris Pinola <[email protected]>
Co-authored-by: Ilya Butorine <[email protected]>
Co-authored-by: Plamen Ivanov <[email protected]>
Co-authored-by: Sean Halpin <[email protected]>
Co-authored-by: Emma Lynch <[email protected]>
Co-authored-by: Dillon Mulroy <[email protected]>
Co-authored-by: Kshitiz Gupta <[email protected]>
Co-authored-by: Chris Pinola <[email protected]>
Co-authored-by: Marco Garcia <[email protected]>
Co-authored-by: Marco Garcia <[email protected]>
Co-authored-by: prachi30 <[email protected]>
Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
17 people authored Oct 21, 2020
1 parent be9949e commit cf102b3
Show file tree
Hide file tree
Showing 99 changed files with 5,696 additions and 1,765 deletions.
38 changes: 22 additions & 16 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ matrix:
stage: smoke
script: yarn build && yarn lint && yarn test

- name: "Linux Node 8"
- name: "Audit Dependencies"
os: linux
node_js: 8
stage: test
node_js: 10
stage: smoke
if: type = cron
script: yarn audit

- name: "Linux Node 12"
os: linux
node_js: 12
Expand All @@ -39,42 +42,50 @@ matrix:
stage: test
env:
- YARN_GPG=no # Windows build agent will hang without this

- name: "MSSQL"
os: linux
node_js: 8
node_js: 10
stage: integrate
services:
- docker
script: yarn build && cd packages/mssql && yarn integrate

- name: "AMQP"
os: linux
node_js: 10
stage: integrate
services:
- docker
script: yarn build && cd packages/amqp && yarn integrate

- name: "Kafka"
os: linux
node_js: 8
node_js: 10
stage: integrate
services:
- docker
script: yarn build && cd packages/kafka && yarn integrate

- name: "Prometheus"
os: linux
node_js: 8
node_js: 10
stage: integrate
services:
- docker
script: yarn build && cd packages/prometheus && yarn integrate

- name: "S3"
os: linux
node_js: 8
node_js: 10
stage: integrate
services:
- docker
script: yarn build && cd packages/s3 && yarn integrate

- name: "Redis"
os: linux
node_js: 8
node_js: 10
stage: integrate
services:
- docker
Expand All @@ -93,7 +104,7 @@ matrix:
- name: "Publish to GitHub Pages"
if: branch = master
os: linux
node_js: 8
node_js: 10
stage: deploy
env:
- GH_NAME=sklose
Expand All @@ -110,11 +121,6 @@ before_install:
- export PATH="$HOME/.yarn/bin:$PATH"

install:
# https://github.com/walmartlabs/cookie-cutter/pull/51
# the kubernetes-client package currently doesn't support node 8 so as a workaround
# create a special package.json that doesn't try to yarn install it. Rely on other stages
# to build and test it.
- if node --version | grep -q v8\.* ; then cp package.v8.json package.json && echo copied package.v8.json into package.json ; fi
- yarn install
- yarn install --frozen-lockfile

script: yarn build && yarn test
43 changes: 43 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,46 @@
# 1.3

## core

- fixed bug where BoundedPriorityQueue could de-prioritize elements under certain circumstances which could lead to out-of-order processing
- deprecated support for Node v8

## kafka

- detect stale broker metadata and crash kafkajs client; this seems to be a bug in kafkajs where the metadata can get stale and all reconnect attempts of kafkajs will fail.

## prometheus

- allow negative values for histograms

## kubernetes

- less verbose logging

## amqp

- new module with sink and source for AMQP compatible message buses

## protobuf

- support embedding encoded protobuf data as base64 in JSON documents

## redis

- support multiple streams for source and sink
- added missing metrics
- fix messages getting lost when receiving in batches

## azure

- support accessing multiple cosmos collection from within the same service
- added support for dead letter queues for Azure Queues

## mssql

- fixed sink swallows actual error message when a transaction fails to commit


# 1.2

## core
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ The documentation is available [here](https://walmartlabs.github.io/cookie-cutte
| Package | Stable | Beta |
|---------|--------|------|
| core | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-core)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-core) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-core/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-core/v/next) |
| amqp | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-amqp)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-amqp) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-amqp/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-amqp/v/next) |
| azure | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-azure)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-azure) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-azure/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-azure/v/next) |
| gcp | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-gcp)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-gcp) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-gcp/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-gcp/v/next) |
| grpc | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-grpc)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-grpc) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-grpc/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-grpc/v/next) |
| instana | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-instana)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-instana) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-instana/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-instana/v/next) |
| jaeger | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-jaeger)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-jaeger) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-jaeger/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-jaeger/v/next) |
| kafka | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-kafka)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-kafka) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-kafka/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-kafka/v/next) |
| kubernetes | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-kubernetes)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-kubernetes) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-kubernetes/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-kubernetes/v/next) |
| lightstep | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-lightstep)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-lightstep) | [![npm version](https://badgen.net/npm/v/@walmartlabs/cookie-cutter-lightstep/next)](https://www.npmjs.com/package/@walmartlabs/cookie-cutter-lightstep/v/next) |
Expand All @@ -42,4 +44,4 @@ The documentation is available [here](https://walmartlabs.github.io/cookie-cutte

# License

See [LICENSE](LICENSE) and [LICENSE-DOCS](LICENSE-DOCS) for more details.
See [LICENSE](LICENSE.md) and [LICENSE-DOCS](LICENSE-DOCS) for more details.
141 changes: 141 additions & 0 deletions docs/docs/Module_Amqp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
---
id: module-amqp
title: AMQP
---

## amqpSource

The `amqpSource` function creates a new input source that receives messages from `RabbitMQ` or another message broker following the `AMQP 0-9-1` protocol.
The example below starts consuming JSON encoded messages from queue `defaultQueueName`.

```typescript
Application.create()
.input()
.add(amqpSource({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
// ...
.run();
```

### Configuration

The available configuration options are

| name | description |
| ---- | ----------- |
| server.host | host name to connect to |
| _server.port_ | port to connect to (default `5672`) |
| queue.name | name of queue to connect to |
| _queque.durable_ | if `true` (default), queue survives restarts of broker, messages are as persistent as their queue |
| _message.expiration_ | time to live per message in milliseconds (default is no expiration) |
| encoder | defines how the raw data received from AMQP Broker should be converted into message objects |

### Consuming from AMQP Broker

```typescript
Application.create()
.input()
.add(amqpSource({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
.dispatch({
onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
ctx.publish(SomeInputMessage, msg);
},
})
// ...
.run();
```

### Metadata

The following metadata is available in the message handler via `ctx.metadata<T>(key)`

| name | description |
| ---- | ----------- |
| AmqpMetadata.name | name of queue this message came from |
| AmqpMetadata.Redelivered | indicates that the message has been previously delivered to this or another client. |
| AmqpMetadata.Expiration | message expiration in milliseconds as specified when publishing the message |

### Metrics

| name | description | Type | Tags |
| ---- | ----------- | ---- | ---- |
| cookie_cutter.amqp_consumer.input_msg_received | number of messages received from the broker | `increment` | `host`, `queueName`, `event_type`, `result` |
| cookie_cutter.amqp_consumer.input_msg_processed | number of messages consumed successfully/unsuccessfully | `increment` | `host`, `queueName`, `event_type`, `result` |
| cookie_cutter.amqp_consumer.unassigned_message_count | number of messages in the queue still not assigned to a consumer | `gauge` | `host`, `queueName` |
| cookie_cutter.amqp_consumer.consumer_count | number of consumers for this queue | `gauge` | `host`, `queueName` |

## amqpSink

The `amqpSink` function creates an output sink that handles published messages. The example below starts publishing JSON encoded messages to queue `defaultQueueName`.

```typescript
Application.create()
// ...
.output()
.published(amqpSink({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
// ...
.run();
```

### Configuration

The available configuration options are

| name | description |
| ---- | ----------- |
| server.host | host name to connect to |
| _server.port_ | port to connect to (default `5672`) |
| queue.name | name of queue to connect to |
| _queque.durable_ | if `true` (default), queue survives restarts of broker, messages are as persistent as their queue |
| _message.expiration_ | time to live per message in milliseconds (default is no expiration) |
| encoder | defines how the raw data received from AMQP Broker should be converted into message objects |

### Publishing to AMQP Broker

```typescript
Application.create()
.dispatch({
onSomeInputMessage(msg: ISomeInputMessage, ctx: IDispatchContext) {
ctx.publish(SomeInputMessage, msg);
}
})
.output()
.published(amqpSink({
server: {
host: "localhost",
},
queue: {
name: "defaultQueueName",
},
encoder: new JsonMessageEncoder(),
}))
.done()
// ...
.run();
```
51 changes: 49 additions & 2 deletions docs/docs/Module_Azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ Application.create()
storageAccessKey: "[SOME_KEY]",
queueName: "[QUEUE_NAME]",
encoder: new JsonMessageEncoder(),
visibilityTimeout: 30, // default 30 seconds
numOfMessages: 32, // default 32
visibilityTimeout: 30, // seconds, Azure default: 30 seconds
numOfMessages: 32, // Azure default: 1
}))
.done()
.dispatch({
Expand All @@ -305,3 +305,50 @@ Application.create()
It is recommended to run the service in Serial mode with `queueSource` because once the message is received from Azure Queues its visibility timeout window starts and running the service in serial mode will decrease the chance of hitting the window timeout as messages are queued up internally in Cookie Cutter in Concurrent mode.

Queues items will be reprocessed if you throw an error in the message handler function. The `DequeueCount` metadata can be used to detect reprocessed messages and skip over those if appropriate.

### Dead Letter Queue

It is possible to designate a queue to serve as a dead letter queue. `maxDequeueCount` specifies how many times a message can be dequeued before it is sent to the dead letter queue. The visibility timeout and message time to live will default to the values of the main queue unless the values are explicitly overwritten.

```typescript
Application.create()
.input()
.add(Streaming.queueSource({
storageAccount: "[SOME_ACCOUNT]",
storageAccessKey: "[SOME_KEY]",
queueName: "[QUEUE_NAME]",
encoder: new JsonMessageEncoder(),
deadLetterQueue: {
queueName: "[OTHER_QUEUE_NAME]",
maxDequeueCount: 10,
visibilityTimeout: 30, // seconds, Azure default: 30 seconds
messageTimeToLive: 120, // seconds, Azure default: 7 days
}
}))
.done()
.dispatch({
onSomeTask: (_msg: ISomeTask, _ctx: IDispatchContext) => {
// ...
},
})
.run(ErrorHandlingMode.LogAndContinue, ParallelismMode.Serial);
```

### Metadata

The following metadata is available

| Name | Description |
|------|-------------|
| GrpcMetadata.Peer | the host and port of the client sending the request |
| QueueMetadata.QueueName | Queue name |
| QueueMetadata.VisibilityTimeout | When passed into msg metadata via `publish`/`store`: Specifies the new visibility timeout value, in seconds, relative to server time |
| QueueMetadata.VisibilityTimeoutMs | When passed into msg metadata via `publish`/`store`: Specifies the new visibility timeout value, in milliseconds, relative to server time |
| QueueMetadata.VisibilityTimeout | When read from the MessageRef metadata: Returns the date when the message will next be visible in string format: "Tue, 21 Apr 2020 16:33:23 GMT" |
| QueueMetadata.TimeToLive | When passed into msg metadata via `publish`/`store`: The time-to-live interval for the message, in seconds. |
| QueueMetadata.TimeToLiveMs | When passed into msg metadata via `publish`/`store`: The time-to-live interval for the message, in milliseconds. |
| QueueMetadata.TimeToLive | When read from the MessageRef metadata: Returns the date when the message will expire in string format: "Tue, 21 Apr 2020 16:33:23 GMT" |
| QueueMetadata.DequeueCount | Number of times a message has been dequeued |
| QueueMetadata.TimeToNextVisible | not used |
| QueueMetadata.MessageId | The message identifier of the message |
| QueueMetadata.PopReceipt | A valid pop receipt value returned from an earlier call to the Get Messages or Update Message operation |
Loading

0 comments on commit cf102b3

Please sign in to comment.