Skip to content

Commit

Permalink
attempt to fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aliparlakci committed Feb 6, 2023
1 parent 4386056 commit 9f5e008
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 11 deletions.
1 change: 1 addition & 0 deletions broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type Configuration struct {
type Broadcaster interface {
Init() (chan []byte, error)
Broadcast([]byte) error
Close() error
}
4 changes: 4 additions & 0 deletions broadcast/causal.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (c *causal_broadcast_container) Broadcast(content []byte) error {
return nil
}

func (c *causal_broadcast_container) Close() error {
return c.broadcaster.Close()
}

func (c *causal_broadcast_container) handle_incoming_messages() {
for {
var message causal_broadcast_message
Expand Down
2 changes: 1 addition & 1 deletion broadcast/causal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func TestCausalBroadcast(t *testing.T) {
peer_addrs := []string{"localhost:9991", "localhost:9992"}
peer_addrs := []string{"localhost:9981", "localhost:9982"}
messages := [][]byte{randStringBytes(2 << 23), []byte("hello gofret!")}
incoming_channels := []chan []byte{make(chan []byte), make(chan []byte)}
done_signal := make(chan bool)
Expand Down
29 changes: 19 additions & 10 deletions broadcast/communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ import (
"io"
"log"
"net"
"reflect"
)

type communication struct {
listener net.Listener
address string
listener net.Listener
address string
close_signal chan bool
}

type communicator interface {
Listen() (chan []byte, error)
CloseConnection()
CloseConnection() error
Send(address string, message []byte) error
}

func new_communication(address string) communicator {
return &communication{address: address}
return &communication{address: address, close_signal: make(chan bool)}
}

func (c *communication) Listen() (chan []byte, error) {
Expand Down Expand Up @@ -48,19 +50,26 @@ func (c *communication) Listen() (chan []byte, error) {

go func() {
for {
connection, err := l.Accept()
if err != nil {
log.Fatalf("something happened while trying to accept an incoming connection: %v", err)
select {
case <-c.close_signal:
return
default:
connection, err := l.Accept()
if err != nil {
log.Printf("%v, something happened while trying to accept an incoming connection: %v\n", reflect.TypeOf(err), err)
continue
}
go handle_request(connection)
}
go handle_request(connection)
}
}()

return incoming_messages, nil
}

func (c *communication) CloseConnection() {
c.listener.Close()
func (c *communication) CloseConnection() error {
c.close_signal <- true
return c.listener.Close()
}

func (c *communication) Send(address string, message []byte) error {
Expand Down
4 changes: 4 additions & 0 deletions broadcast/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (f *fifo_broadcast_container) Broadcast(content []byte) error {
return nil
}

func (f *fifo_broadcast_container) Close() error {
return f.broadcaster.Close()
}

func (f *fifo_broadcast_container) handle_incoming_messages() {
for {
var message fifo_broadcast_message
Expand Down
4 changes: 4 additions & 0 deletions broadcast/unordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (bc *broadcast_container) Init() (chan []byte, error) {
return incoming_messages, nil
}

func (bc *broadcast_container) Close() error {
return bc.Communicator.CloseConnection()
}

func UnorderedBroadcast(config Configuration) Broadcaster {
new_broadcast := broadcast_container{peer_addrs: config.PeerAddresses, address: config.SelfAddress}
return &new_broadcast
Expand Down

0 comments on commit 9f5e008

Please sign in to comment.