Skip to content

Commit

Permalink
Add Subreddit Comment Support
Browse files Browse the repository at this point in the history
  • Loading branch information
RGood committed Mar 7, 2021
1 parent 2f1019d commit dca68de
Show file tree
Hide file tree
Showing 5 changed files with 815 additions and 0 deletions.
82 changes: 82 additions & 0 deletions reddit/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,93 @@ func (s *StreamService) Posts(subreddit string, opts ...StreamOpt) (<-chan *Post
return postsCh, errsCh, stop
}

// Comments streams comments from the specified subreddit.
// It returns 2 channels and a function:
// - a channel into which new comments will be sent
// - a channel into which any errors will be sent
// - a function that the client can call once to stop the streaming and close the channels
// Because of the 100 result limit imposed by Reddit when fetching posts, some high-traffic
// streams might drop submissions between API requests, such as when streaming r/all.
func (s *StreamService) Comments(subreddit string, opts ...StreamOpt) (<-chan *Comment, <-chan error, func()) {
streamConfig := &streamConfig{
Interval: defaultStreamInterval,
DiscardInitial: false,
MaxRequests: 0,
}
for _, opt := range opts {
opt(streamConfig)
}

ticker := time.NewTicker(streamConfig.Interval)
commentsCh := make(chan *Comment)
errsCh := make(chan error)

var once sync.Once
stop := func() {
once.Do(func() {
ticker.Stop()
close(commentsCh)
close(errsCh)
})
}

ids := set{}

go func() {
defer stop()

var n int
infinite := streamConfig.MaxRequests == 0

for ; ; <-ticker.C {
n++

comments, err := s.getComments(subreddit)
if err != nil {
errsCh <- err
if !infinite && n >= streamConfig.MaxRequests {
break
}
continue
}

for _, comment := range comments {
id := comment.FullID

// certain comment streams are inconsistent about the completeness of returned comments
// it's not enough to check if we've seen older comments, but we must check for every comment individually
if !ids.Exists(id) {
ids.Add(id)

if streamConfig.DiscardInitial {
streamConfig.DiscardInitial = false
break
}

commentsCh <- comment
}

}

if !infinite && n >= streamConfig.MaxRequests {
break
}
}
}()

return commentsCh, errsCh, stop
}

func (s *StreamService) getPosts(subreddit string) ([]*Post, error) {
posts, _, err := s.client.Subreddit.NewPosts(context.Background(), subreddit, &ListOptions{Limit: 100})
return posts, err
}

func (s *StreamService) getComments(subreddit string) ([]*Comment, error) {
comments, _, err := s.client.Subreddit.Comments(context.Background(), subreddit, &ListOptions{Limit: 100})
return comments, err
}

type set map[string]struct{}

func (s set) Add(v string) {
Expand Down
300 changes: 300 additions & 0 deletions reddit/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,3 +308,303 @@ loop:

require.Len(t, expectedPostIDs, i)
}

func TestStreamService_Comments(t *testing.T) {
client, mux := setup(t)

var counter int
mux.HandleFunc("/r/testsubreddit/comments", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodGet, r.Method)
defer func() { counter++ }()

switch counter {
case 0:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment1"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment2"
}
}
]
}
}`)
case 1:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment3"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment1"
}
}
]
}
}`)
case 2:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment4"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment5"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment6"
}
}
]
}
}`)
case 3:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment7"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment8"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment9"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment10"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment11"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment12"
}
}
]
}
}`)
default:
fmt.Fprint(w, `{}`)
}
})

comments, errs, stop := client.Stream.Comments("testsubreddit", StreamInterval(time.Millisecond*10), StreamMaxRequests(4))
defer stop()

expectedCommentIds := []string{"t1_comment1", "t1_comment2", "t1_comment3", "t1_comment4", "t1_comment5", "t1_comment6", "t1_comment7", "t1_comment8", "t1_comment9", "t1_comment10", "t1_comment11", "t1_comment12"}
var i int

loop:
for i != len(expectedCommentIds) {
select {
case comment, ok := <-comments:
if !ok {
break loop
}
require.Equal(t, expectedCommentIds[i], comment.FullID)
case err, ok := <-errs:
if !ok {
break loop
}
require.NoError(t, err)
}
i++
}

require.Len(t, expectedCommentIds, i)
}

func TestStreamService_CommentsDiscardInitial(t *testing.T) {
client, mux := setup(t)

var counter int
mux.HandleFunc("/r/testsubreddit/comments", func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, http.MethodGet, r.Method)
defer func() { counter++ }()

switch counter {
case 0:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment1"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment2"
}
}
]
}
}`)
case 1:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment3"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment1"
}
}
]
}
}`)
case 2:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment4"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment5"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment6"
}
}
]
}
}`)
case 3:
fmt.Fprint(w, `{
"kind": "Listing",
"data": {
"children": [
{
"kind": "t1",
"data": {
"name": "t1_comment7"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment8"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment9"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment10"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment11"
}
},
{
"kind": "t1",
"data": {
"name": "t1_comment12"
}
}
]
}
}`)
default:
fmt.Fprint(w, `{}`)
}
})

comments, errs, stop := client.Stream.Comments("testsubreddit", StreamInterval(time.Millisecond*10), StreamMaxRequests(4), StreamDiscardInitial)
defer stop()

expectedCommentIds := []string{"t1_comment3", "t1_comment4", "t1_comment5", "t1_comment6", "t1_comment7", "t1_comment8", "t1_comment9", "t1_comment10", "t1_comment11", "t1_comment12"}
var i int

loop:
for i != len(expectedCommentIds) {
select {
case comment, ok := <-comments:
if !ok {
break loop
}
require.Equal(t, expectedCommentIds[i], comment.FullID)
case err, ok := <-errs:
if !ok {
break loop
}
require.NoError(t, err)
}
i++
}

require.Len(t, expectedCommentIds, i)
}
Loading

0 comments on commit dca68de

Please sign in to comment.