Skip to content

Commit

Permalink
Move traceroute response handling out of goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
lamaral committed Aug 23, 2023
1 parent ab3d2cb commit 5c5ea9a
Showing 1 changed file with 28 additions and 38 deletions.
66 changes: 28 additions & 38 deletions command/ripeatlas/traceroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,45 +113,35 @@ func (c *tracerouteCommand) traceroute(match matcher.Result, message msg.Message
slack.MsgOptionTS(message.GetTimestamp()),
)

messageUpdates := make(chan string, 2)

go func() {
defer close(messageUpdates)

subscribeURL := fmt.Sprintf("https://atlas-stream.ripe.net/stream/?streamType=result&msm=%d", measurementResult.Measurements[0])

client := http.Client{Timeout: 240 * time.Second}
response, err = client.Get(subscribeURL)
defer response.Body.Close()
fileScanner := bufio.NewScanner(response.Body)
fileScanner.Split(bufio.ScanLines)
scanner:
for fileScanner.Scan() {
line := fileScanner.Text()

var streamResponse StreamingResponse
err = json.Unmarshal([]byte(line), &streamResponse)
if err != nil {
log.Errorf("Error unmarshaling streamResponse: %s", err)
}

switch streamResponse.Type {
case "atlas_subscribed":
log.Debugf("Successfully subscribed to measurement")
case "atlas_result":
srp := streamResponse.Payload
messageUpdates <- fmt.Sprintf("%s", srp)
break scanner
}
subscribeURL := fmt.Sprintf("https://atlas-stream.ripe.net/stream/?streamType=result&msm=%d", measurementResult.Measurements[0])

client := http.Client{Timeout: 240 * time.Second}
response, err = client.Get(subscribeURL)
defer response.Body.Close()
fileScanner := bufio.NewScanner(response.Body)
fileScanner.Split(bufio.ScanLines)
scanner:
for fileScanner.Scan() {
line := fileScanner.Text()

var streamResponse StreamingResponse
err = json.Unmarshal([]byte(line), &streamResponse)
if err != nil {
log.Errorf("Error unmarshaling streamResponse: %s", err)
}

switch streamResponse.Type {
case "atlas_subscribed":
log.Debugf("Successfully subscribed to measurement")
case "atlas_result":
content := fmt.Sprintf("%s", streamResponse.Payload)
c.SendMessage(
message,
content,
slack.MsgOptionTS(message.GetTimestamp()),
)
break scanner
}
}()

for delta := range messageUpdates {
c.SendMessage(
message,
delta,
slack.MsgOptionTS(message.GetTimestamp()),
)
}
}

Expand Down

0 comments on commit 5c5ea9a

Please sign in to comment.