Skip to content

Commit

Permalink
update review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
fogfish committed Mar 10, 2024
1 parent 43cdfb3 commit c651516
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 55 deletions.
7 changes: 0 additions & 7 deletions broker/events3/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,8 @@ import (
queue "github.com/fogfish/swarm/queue"
)

// const Category = "events3.Event"
const Category = "S3EventRecord"

// type Event swarm.Event[*events.S3EventRecord]

// func (Event) HKT1(swarm.EventType) {}
// func (Event) HKT2(*events.S3EventRecord) {}

func Dequeue(q swarm.Broker) (<-chan swarm.Msg[*events.S3EventRecord], chan<- swarm.Msg[*events.S3EventRecord]) {
return queue.Dequeue[*events.S3EventRecord](q)
//return queue.Dequeue[*events.S3EventRecord, *Event](q)
}
48 changes: 0 additions & 48 deletions broker/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,6 @@ func (cli *Client) Ack(digest string) error {
return nil
}

// Ack acknowledges message to broker
// func (cli *Client) Ack(bag swarm.Bag) error {
// ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout)
// defer cancel()

// _, err := cli.service.DeleteMessage(ctx,
// &sqs.DeleteMessageInput{
// QueueUrl: cli.queue,
// ReceiptHandle: aws.String(string(bag.Digest.Brief)),
// },
// )
// if err != nil {
// return swarm.ErrServiceIO.New(err)
// }

// return nil
// }

// Deq dequeues message from broker
func (cli Client) Ask() ([]swarm.Bag, error) {
ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout*2)
Expand Down Expand Up @@ -191,36 +173,6 @@ func (cli Client) Ask() ([]swarm.Bag, error) {
}, nil
}

// Deq dequeues message from broker
// func (cli Client) Deq(cat string) (swarm.Bag, error) {
// ctx, cancel := context.WithTimeout(context.Background(), cli.config.NetworkTimeout)
// defer cancel()

// result, err := cli.service.ReceiveMessage(ctx,
// &sqs.ReceiveMessageInput{
// MessageAttributeNames: []string{string(types.QueueAttributeNameAll)},
// QueueUrl: cli.queue,
// MaxNumberOfMessages: 1, // TODO
// WaitTimeSeconds: 10, // TODO
// },
// )
// if err != nil {
// return swarm.Bag{}, swarm.ErrDequeue.New(err)
// }

// if len(result.Messages) == 0 {
// return swarm.Bag{}, nil
// }

// head := result.Messages[0]

// return swarm.Bag{
// Category: attr(&head, "Category"),
// Object: []byte(*head.Body),
// Digest: swarm.Digest{Brief: *head.ReceiptHandle},
// }, nil
// }

func attr(msg *types.Message, key string) string {
val, exists := msg.MessageAttributes[key]
if !exists {
Expand Down
8 changes: 8 additions & 0 deletions codec.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package swarm

import (
Expand Down
8 changes: 8 additions & 0 deletions internal/kernel/kernel.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//
// Copyright (C) 2021 - 2024 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the Apache License Version 2.0. See the LICENSE file for details.
// https://github.com/fogfish/swarm
//

package kernel

import (
Expand Down

0 comments on commit c651516

Please sign in to comment.