Skip to content

Commit

Permalink
set up websocket endpoint test
Browse files Browse the repository at this point in the history
  • Loading branch information
Kylie Pace committed Jan 14, 2021
1 parent b105014 commit 6a55c30
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 62 deletions.
15 changes: 4 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,19 @@ There was some risk for me in choosing Kafka over the Google Cloud Platform Pub/

Speaking of running locally, I didn't completely divorce the two services as I would if they were really being deployed separately, and I did that so that I could keep this as a single package with a single `package.json` file, for ease of use as an MVP.

Once the consuming service receives data through the kafka topic, that data gets saved to a redis instance. A more typical solution with kafka would probably be to use Faust or a stream processing framework, however I found the options for node to be less developed, didn't want to introduce a second language to my code sample, and think that Redis is doing essentially what I'd be doing if I were using a framework to transform data from streams into tables.
Once the consuming service receives data through the kafka topic, that data gets saved to a redis instance. A more typical solution with kafka would probably be to use Faust or a stream processing framework, however I found the options for node to be less developed. Initially I chose to use Redis because it's a fast and light in-memory database, but its limitations meant that the data could not be saved in the transformed schema as defined by the coding instructions.

Of course, this raises the question of if I'm using redis already, why not make use of Redis Pub/Sub as the messaging system? My only reason to use kafka is because I wanted to see if I could set up a kafka system in Node; I think the redis system looks good, too.
Therefore, in order to save the data in its transformed state, I replaced Redis with MongoDB for its more powerful features when working with JSON.


ZADD [session_id] score[timestamp] member[name]
ZADD 12 1569972083 cart_loaded
orders from smallest to greatest timestamp
ZRANGE [session_id] 0 -1 WITHSCORES // 0 and -1 are indices

### What I'd add
- a proper logging client so that logs are searchable by session_id and request_id header
- security & rate limiting
- CICD
- CI/CD
- architecture: no need for this to be a monolithic application
- redis instance not localhost
- replace services from docker-compose with actual deployed instances
- validate event model





[why ip address is required](https://github.com/wurstmeister/kafka-docker/wiki/Connectivity)
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ services:
mongodb:
image: mongo:latest
environment:
MONGO_INITDB_ROOT_USERNAME: ${MONGO_INITDB_ROOT_USERNAME}
MONGO_INITDB_ROOT_USERNAME: demo_user
MONGO_INITDB_ROOT_PASSWORD: ${MONGO_INITDB_ROOT_PASSWORD}
MONGO_INITDB_DATABASE: ${MONGO_INITDB_DATABASE}
MONGO_INITDB_DATABASE: 'events'
ports:
- "27017:27017"

Expand Down
6 changes: 5 additions & 1 deletion src/clients/MongoClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class MongoConnector {
cachedClient?: MongoClient;
private client: MongoClient

constructor(uri: string = `${HOST_IP}:27017`) {
constructor(uri: string = `mongodb://localhost:27017`) {
this.client = new MongoClient(uri, {
useUnifiedTopology: true,
useNewUrlParser: true,
Expand All @@ -26,6 +26,10 @@ class MongoConnector {
}
return this.cachedClient;
}

disconnect(){
this.client.close();
}
}

export default new MongoConnector(process.env.MONGO_URI);
3 changes: 2 additions & 1 deletion src/consumers/KafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ export default class KafkaConsumer {
}

async subscribe(): Promise<void> {
return await this.consumer.subscribe({
await this.consumer.subscribe({
topic: this.topicName,
fromBeginning: true
});
console.log('consumer subscribed')
}
}
11 changes: 11 additions & 0 deletions src/repositories/MongoRepository.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
Db,
Collection,
DeleteWriteOpResultObject,
FindAndModifyWriteOpResultObject,
ObjectID,
FilterQuery,
Expand Down Expand Up @@ -40,6 +41,16 @@ export default class MongoRepository {
return this.cachedDb ? this.cachedDb.collection(this.collectionName) : await this.connect();
}

async delete<T>(query: FilterQuery<T>): Promise<void>{
const collection = await this.collection()
const result = await collection.deleteOne(query);
console.log(`${result.deletedCount} documents deleted`);
}

disconnect(){
MongoClient.disconnect();
}

async findOne<T>(query: FilterQuery<T>, options?: FindOneOptions<T extends any ? any : T>): Promise<T | null> {
const collection = await this.collection()
return collection.findOne(
Expand Down
1 change: 0 additions & 1 deletion src/server/services/PublishData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export default class PublishDataService {
* publish the data to the topic defined by the producer model
*/
async publish(msg: string, sessionId: string): Promise<RecordMetadata[]>{
console.log(sessionId)
const message = {
value: msg,
key: sessionId.toString()
Expand Down
46 changes: 0 additions & 46 deletions test/websocket.js

This file was deleted.

71 changes: 71 additions & 0 deletions test/websocket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import WebSocket from 'ws';
import MongoRepository from '../src/repositories/MongoRepository';


describe('websocket http request', () => {
const database = new MongoRepository();
const session_id = "e6fa79ca-d142-4046-a134-5134f16a0b5e";
let clients;
async function run() {
clients[0].send(
JSON.stringify([{
timestamp: 1569972082,
type: "SESSION_START",
session_id
}])
);

clients[0].send(
JSON.stringify([{
timestamp: 1569972085,
type: "EVENT",
name: "basket_removed"
}])
);

clients[0].send(
JSON.stringify([{
timestamp: 1569972083,
type: "EVENT",
name: "purchase_completed"
}])
);

console.log('client sent message')

clients[0].send(
JSON.stringify([{
timestamp: 1569972086,
type: "SESSION_END",
session_id
}])
);
console.log('client sent final message')

return;
}

before(async () => {
// clean up data
await database.delete({
session_id
});

clients = [
new WebSocket('ws://localhost:8080/websocket?session_id="e6fa79ca-d142-4046-a134-5134f16a0b5e"')
];

// Wait for the client to connect using async/await
await new Promise(resolve => clients[0].once('open', resolve));
console.log('connected');

// send messages
await run();
});

it('saves data', () => {

})
});


0 comments on commit 6a55c30

Please sign in to comment.