diff --git a/.pubnub.yml b/.pubnub.yml index 522fea9e..cda78200 100644 --- a/.pubnub.yml +++ b/.pubnub.yml @@ -1,5 +1,15 @@ --- changelog: + - + changes: + - + text: "Implement history Message Counts" + type: improvement + - + text: "All request were secure (https), even when the Secure flag was false" + type: bug + date: Mar 5, 19 + version: v4.1.7 - changes: - @@ -276,6 +286,7 @@ features: - STORAGE-COUNT - STORAGE-DELETE-MESSAGES - STORAGE-FETCH-MESSAGES + - STORAGE-MESSAGE-COUNT subscribe: - SUBSCRIBE-CHANNELS - SUBSCRIBE-CHANNEL-GROUPS @@ -313,4 +324,4 @@ supported-platforms: - "Mac OS X 10.8 or later, amd64" - "Windows 7 or later, amd64, 386" version: "PubNub Go SDK" -version: v4.1.6 +version: v4.1.7 diff --git a/README.md b/README.md index 77508439..7eea5277 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# PubNub 4.1.6 client for Go +# PubNub 4.1.7 client for Go * Go (1.9+) # Please direct all Support Questions and Concerns to Support@PubNub.com diff --git a/VERSION b/VERSION index 561ad334..9edf2a44 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -4.1.6 +4.1.7 diff --git a/endpoints.go b/endpoints.go index 3c01ccfa..ad0f1c7e 100644 --- a/endpoints.go +++ b/endpoints.go @@ -121,9 +121,14 @@ func buildURL(o endpointOpts) (*url.URL, error) { path = fmt.Sprintf("//%s%s", o.config().Origin, path) + secure := "" + if o.config().Secure { + secure = "s" + } + retURL := &url.URL{ Opaque: path, - Scheme: "https", + Scheme: fmt.Sprintf("http%s", secure), Host: o.config().Origin, RawQuery: stringifiedQuery, } diff --git a/enums.go b/enums.go index a7a0ffbe..41f1a98d 100644 --- a/enums.go +++ b/enums.go @@ -106,6 +106,8 @@ const ( PNAccessManagerRevoke // PNDeleteMessagesOperation is the enum used for the Delete Messages from History operation. PNDeleteMessagesOperation + // PNMessageCountsOperation is the enum used for History with messages operation. + PNMessageCountsOperation ) const ( diff --git a/examples/cli/cli_demo.go b/examples/cli/cli_demo.go index e7f69875..684c8af3 100644 --- a/examples/cli/cli_demo.go +++ b/examples/cli/cli_demo.go @@ -58,6 +58,7 @@ func connect() { config.PublishKey = "demo" config.SubscribeKey = "demo" config.SecretKey = "demo" + //config.Secure = false config.AuthKey = "akey" @@ -160,6 +161,7 @@ func showHelp() { showSubscribeWithStateHelp() showPresenceTimeoutHelp() showPresenceHelp() + showMessageCountsHelp() fmt.Println("") fmt.Println("================") fmt.Println(" || COMMANDS ||") @@ -169,6 +171,12 @@ func showHelp() { fmt.Println(" QUIT \n\tctrl+c ") } +func showMessageCountsHelp() { + fmt.Println(" MessageCounts EXAMPLE: ") + fmt.Println(" messageCounts Channel(s) timetoken timetoken1,timetoken2") + fmt.Println(" messageCounts my-channel,my-channel1 15210190573608384 15210190573608384,15211140747622125") +} + func showGetStateHelp() { fmt.Println(" GET STATE EXAMPLE: ") fmt.Println(" getstate Channel ") @@ -337,6 +345,8 @@ func readCommand(cmd string) { setPresenceTimeout(command[1:]) case "presence": runPresenceRequest(command[1:]) + case "messageCounts": + messageCounts(command[1:]) case "q": pn.UnsubscribeAll() case "d": @@ -346,6 +356,32 @@ func readCommand(cmd string) { } } +func messageCounts(args []string) { + if len(args) < 2 { + showMessageCountsHelp() + } + + var channels []string + channels = strings.Split(args[0], ",") + + var timetoken string + timetoken = args[1] + + var channelsTimetoken []string + if len(args) > 2 { + channelsTimetoken = strings.Split(args[2], ",") + } + + res, status, err := pn.MessageCounts().Channels(channels).Timetoken(timetoken).ChannelsTimetoken(channelsTimetoken).Execute() + fmt.Println(status) + fmt.Println(err) + for ch, v := range res.Channels { + fmt.Printf("%s %d", ch, v) + fmt.Println("") + } + +} + func runPresenceRequest(args []string) { if len(args) < 2 { showPresenceHelp() diff --git a/message_counts.go b/message_counts.go new file mode 100644 index 00000000..5e22e935 --- /dev/null +++ b/message_counts.go @@ -0,0 +1,219 @@ +package pubnub + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + + "github.com/pubnub/go/pnerr" + "github.com/pubnub/go/utils" + "reflect" + "strings" + + "net/http" + "net/url" +) + +var emptyMessageCountsResp *MessageCountsResponse + +const messageCountsPath = "/v3/history/sub-key/%s/message-counts/%s" + +type messageCountsBuilder struct { + opts *messageCountsOpts +} + +func newMessageCountsBuilder(pubnub *PubNub) *messageCountsBuilder { + builder := messageCountsBuilder{ + opts: &messageCountsOpts{ + pubnub: pubnub, + }, + } + + return &builder +} + +func newMessageCountsBuilderWithContext(pubnub *PubNub, + context Context) *messageCountsBuilder { + builder := messageCountsBuilder{ + opts: &messageCountsOpts{ + pubnub: pubnub, + ctx: context, + }, + } + + return &builder +} + +// Channels sets the Channels for the MessageCounts request. +func (b *messageCountsBuilder) Channels(channels []string) *messageCountsBuilder { + b.opts.Channels = channels + return b +} + +// Timetoken sets the number of items to return in the MessageCounts request. +func (b *messageCountsBuilder) Timetoken(timetoken string) *messageCountsBuilder { + b.opts.Timetoken = timetoken + return b +} + +// ChannelsTimetoken sets the order of messages in the MessageCounts request. +func (b *messageCountsBuilder) ChannelsTimetoken(channelsTimetoken []string) *messageCountsBuilder { + b.opts.ChannelsTimetoken = channelsTimetoken + return b +} + +// QueryParam accepts a map, the keys and values of the map are passed as the query string parameters of the URL called by the API. +func (b *messageCountsBuilder) QueryParam(queryParam map[string]string) *messageCountsBuilder { + b.opts.QueryParam = queryParam + + return b +} + +// Transport sets the Transport for the MessageCounts request. +func (b *messageCountsBuilder) Transport(tr http.RoundTripper) *messageCountsBuilder { + b.opts.Transport = tr + return b +} + +// Execute runs the MessageCounts request. +func (b *messageCountsBuilder) Execute() (*MessageCountsResponse, StatusResponse, error) { + rawJSON, status, err := executeRequest(b.opts) + if err != nil { + return emptyMessageCountsResp, status, err + } + + return newMessageCountsResponse(rawJSON, b.opts, status) +} + +type messageCountsOpts struct { + pubnub *PubNub + + Channels []string + Timetoken string + ChannelsTimetoken []string + + QueryParam map[string]string + + // nil hacks + Transport http.RoundTripper + + ctx Context +} + +func (o *messageCountsOpts) config() Config { + return *o.pubnub.Config +} + +func (o *messageCountsOpts) client() *http.Client { + return o.pubnub.GetClient() +} + +func (o *messageCountsOpts) context() Context { + return o.ctx +} + +func (o *messageCountsOpts) validate() error { + if o.config().SubscribeKey == "" { + return newValidationError(o, StrMissingSubKey) + } + + if len(o.Channels) <= 0 { + return newValidationError(o, StrMissingChannel) + } + + return nil +} + +func (o *messageCountsOpts) buildPath() (string, error) { + channels := utils.JoinChannels(o.Channels) + + return fmt.Sprintf(messageCountsPath, + o.pubnub.Config.SubscribeKey, + channels), nil +} + +func (o *messageCountsOpts) buildQuery() (*url.Values, error) { + q := defaultQuery(o.pubnub.Config.UUID, o.pubnub.telemetryManager) + + q.Set("timetoken", o.Timetoken) + q.Set("channelsTimetoken", strings.Join(o.ChannelsTimetoken, ",")) + SetQueryParam(q, o.QueryParam) + + return q, nil +} + +func (o *messageCountsOpts) jobQueue() chan *JobQItem { + return o.pubnub.jobQueue +} + +func (o *messageCountsOpts) buildBody() ([]byte, error) { + return []byte{}, nil +} + +func (o *messageCountsOpts) httpMethod() string { + return "GET" +} + +func (o *messageCountsOpts) isAuthRequired() bool { + return true +} + +func (o *messageCountsOpts) requestTimeout() int { + return o.pubnub.Config.NonSubscribeRequestTimeout +} + +func (o *messageCountsOpts) connectTimeout() int { + return o.pubnub.Config.ConnectTimeout +} + +func (o *messageCountsOpts) operationType() OperationType { + return PNMessageCountsOperation +} + +func (o *messageCountsOpts) telemetryManager() *TelemetryManager { + return o.pubnub.telemetryManager +} + +// MessageCountsResponse is the response to MessageCounts request. It contains a map of type MessageCountsResponseItem +type MessageCountsResponse struct { + Channels map[string]int +} + +//http://ps.pndsn.com/v3/history/sub-key/demo/message-counts/my-channel,my-channel1?timestamp=1549982652&pnsdk=PubNub-Go/4.1.6&uuid=pn-82f145ea-adc3-4917-a11d-76a957347a82&timetoken=15499825804610610&channelsTimetoken=15499825804610610,15499925804610615&auth=akey&signature=pVDVge_suepcOlSMllpsXg_jpOjtEpW7B3HHFaViI4s= +//{"status": 200, "error": false, "error_message": "", "channels": {"my-channel1":1,"my-channel":2}} +func newMessageCountsResponse(jsonBytes []byte, o *messageCountsOpts, + status StatusResponse) (*MessageCountsResponse, StatusResponse, error) { + + resp := &MessageCountsResponse{} + + var value interface{} + + err := json.Unmarshal(jsonBytes, &value) + if err != nil { + e := pnerr.NewResponseParsingError("Error unmarshalling response", + ioutil.NopCloser(bytes.NewBufferString(string(jsonBytes))), err) + + return emptyMessageCountsResp, status, e + } + + if result, ok := value.(map[string]interface{}); ok { + o.pubnub.Config.Log.Println(result["channels"]) + if channels, ok1 := result["channels"].(map[string]interface{}); ok1 { + if channels != nil { + resp.Channels = make(map[string]int) + for ch, v := range channels { + resp.Channels[ch] = int(v.(float64)) + } + } else { + o.pubnub.Config.Log.Printf("type assertion to map failed %v\n", result) + } + } else { + o.pubnub.Config.Log.Println("Assertion failed", reflect.TypeOf(result["channels"])) + } + } else { + o.pubnub.Config.Log.Printf("type assertion to map failed %v\n", value) + } + + return resp, status, nil +} diff --git a/message_counts_test.go b/message_counts_test.go new file mode 100644 index 00000000..97aee916 --- /dev/null +++ b/message_counts_test.go @@ -0,0 +1,169 @@ +package pubnub + +import ( + "fmt" + "testing" + + h "github.com/pubnub/go/tests/helpers" + "github.com/stretchr/testify/assert" +) + +func AssertSuccessMessageCountsGet(t *testing.T, expectedString string, channels []string, timetoken string, channelsTimetoken []string) { + assert := assert.New(t) + + opts := &messageCountsOpts{ + Channels: channels, + Timetoken: timetoken, + ChannelsTimetoken: channelsTimetoken, + pubnub: pubnub, + } + + path, err := opts.buildPath() + assert.Nil(err) + + h.AssertPathsEqual(t, + fmt.Sprintf("/v3/history/sub-key/sub_key/message-counts/%s", expectedString), + path, []int{}) + + body, err := opts.buildBody() + assert.Nil(err) + + assert.Empty(body) +} + +func TestMessageCountsPath(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertSuccessMessageCountsGet(t, "test1,test2", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsQuery(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertSuccessMessageCountsGetQuery(t, "15499825804610610", "15499825804610610,15499925804610615", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsQuery2(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{} + AssertSuccessMessageCountsGetQuery(t, "15499825804610610", "", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsQuery3(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertSuccessMessageCountsGetQuery(t, "", "15499825804610610,15499925804610615", channels, "", channelsTimetoken) +} + +func AssertSuccessMessageCountsGetQuery(t *testing.T, expectedString1 string, expectedString2 string, channels []string, timetoken string, channelsTimetoken []string) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + opts := &messageCountsOpts{ + Channels: channels, + Timetoken: timetoken, + ChannelsTimetoken: channelsTimetoken, + pubnub: pubnub, + QueryParam: queryParam, + } + + u, err := opts.buildQuery() + assert.Nil(err) + + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + + assert.Equal(expectedString1, u.Get("timetoken")) + assert.Equal(expectedString2, u.Get("channelsTimetoken")) + +} + +func AssertNewMessageCountsBuilder(t *testing.T, testQueryParam bool, testContext bool, expectedString string, expectedString1 string, expectedString2 string, channels []string, timetoken string, channelsTimetoken []string) { + assert := assert.New(t) + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + o := newMessageCountsBuilder(pubnub) + if testContext { + o = newMessageCountsBuilderWithContext(pubnub, backgroundContext) + } + o.Channels(channels) + o.Timetoken(timetoken) + o.ChannelsTimetoken(channelsTimetoken) + if testQueryParam { + o.QueryParam(queryParam) + } + + path, err := o.opts.buildPath() + assert.Nil(err) + + h.AssertPathsEqual(t, + fmt.Sprintf("/v3/history/sub-key/sub_key/message-counts/%s", expectedString), + path, []int{}) + + u, _ := o.opts.buildQuery() + + if testQueryParam { + assert.Equal("v1", u.Get("q1")) + assert.Equal("v2", u.Get("q2")) + } + + assert.Equal(expectedString1, u.Get("timetoken")) + assert.Equal(expectedString2, u.Get("channelsTimetoken")) + +} + +func TestMessageCountsBuilder(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertNewMessageCountsBuilder(t, false, false, "test1,test2", "15499825804610610", "15499825804610610,15499925804610615", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsBuilderQP(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertNewMessageCountsBuilder(t, true, false, "test1,test2", "15499825804610610", "15499825804610610,15499925804610615", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsBuilderContext(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertNewMessageCountsBuilder(t, false, true, "test1,test2", "15499825804610610", "15499825804610610,15499925804610615", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsBuilderContextQP(t *testing.T) { + channels := []string{"test1", "test2"} + channelsTimetoken := []string{"15499825804610610", "15499925804610615"} + AssertNewMessageCountsBuilder(t, true, true, "test1,test2", "15499825804610610", "15499825804610610,15499925804610615", channels, "15499825804610610", channelsTimetoken) +} + +func TestMessageCountsResponseValueError(t *testing.T) { + assert := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + opts := &messageCountsOpts{ + pubnub: pn, + } + jsonBytes := []byte(`s`) + + _, _, err := newMessageCountsResponse(jsonBytes, opts, StatusResponse{}) + assert.Equal("pubnub/parsing: Error unmarshalling response: {s}", err.Error()) +} + +//{"status": 200, "error": false, "error_message": "", "channels": {"my-channel1":1,"my-channel":2}} +func TestMessageCountsResponseValuePass(t *testing.T) { + assert := assert.New(t) + pn := NewPubNub(NewDemoConfig()) + opts := &messageCountsOpts{ + pubnub: pn, + } + jsonBytes := []byte(`{"status": 200, "error": false, "error_message": "", "channels": {"my-channel1":1,"my-channel":2}}`) + + res, _, err := newMessageCountsResponse(jsonBytes, opts, StatusResponse{}) + assert.Equal(2, res.Channels["my-channel"]) + assert.Equal(1, res.Channels["my-channel1"]) + assert.Nil(err) +} diff --git a/pubnub.go b/pubnub.go index aaa8f78b..0aae63d2 100644 --- a/pubnub.go +++ b/pubnub.go @@ -12,7 +12,7 @@ import ( // Default constants const ( // Version :the version of the SDK - Version = "4.1.6" + Version = "4.1.7" // MaxSequence for publish messages MaxSequence = 65535 ) @@ -94,6 +94,14 @@ func (pn *PubNub) FetchWithContext(ctx Context) *fetchBuilder { return newFetchBuilderWithContext(pn, ctx) } +func (pn *PubNub) MessageCounts() *messageCountsBuilder { + return newMessageCountsBuilder(pn) +} + +func (pn *PubNub) MessageCountsWithContext(ctx Context) *messageCountsBuilder { + return newMessageCountsBuilderWithContext(pn, ctx) +} + func (pn *PubNub) SetState() *setStateBuilder { return newSetStateBuilder(pn) } diff --git a/request.go b/request.go index 21fe7f3d..7943b9f3 100644 --- a/request.go +++ b/request.go @@ -217,19 +217,19 @@ func parseResponse(resp *http.Response, opts endpointOpts) ([]byte, StatusRespon if resp.StatusCode == 408 { opts.config().Log.Println("PNTimeoutCategory: resp.StatusCode, resp.Body, resp.Request.URL", resp.StatusCode, resp.Body, resp.Request.URL) - status = createStatus(PNTimeoutCategory, "", ResponseInfo{}, e) + status = createStatus(PNTimeoutCategory, "", ResponseInfo{StatusCode: resp.StatusCode}, e) return nil, status, e } if resp.StatusCode == 400 { opts.config().Log.Println("PNBadRequestCategory: resp.StatusCode, resp.Body, resp.Request.URL", resp.StatusCode, resp.Body, resp.Request.URL) - status = createStatus(PNBadRequestCategory, "", ResponseInfo{}, e) + status = createStatus(PNBadRequestCategory, "", ResponseInfo{StatusCode: resp.StatusCode}, e) return nil, status, e } opts.config().Log.Println("PNUnknownCategory: resp.StatusCode, resp.Body, resp.Request.URL", resp.StatusCode, resp.Body, resp.Request.URL) - status = createStatus(PNUnknownCategory, "", ResponseInfo{}, e) + status = createStatus(PNUnknownCategory, "", ResponseInfo{StatusCode: resp.StatusCode, Operation: opts.operationType()}, e) return nil, status, e } diff --git a/telemetry_manager.go b/telemetry_manager.go index 80f4265e..c78f20fe 100644 --- a/telemetry_manager.go +++ b/telemetry_manager.go @@ -151,8 +151,13 @@ func telemetryEndpointNameForOperation(t OperationType) string { case PNPublishOperation: endpoint = "pub" break + case PNMessageCountsOperation: + endpoint = "mc" + break case PNHistoryOperation: fallthrough + case PNFetchMessagesOperation: + fallthrough case PNDeleteMessagesOperation: endpoint = "hist" break diff --git a/tests/e2e/fetch_test.go b/tests/e2e/fetch_test.go index 24f1c84d..e47e4a09 100644 --- a/tests/e2e/fetch_test.go +++ b/tests/e2e/fetch_test.go @@ -5,6 +5,7 @@ import ( //"log" //"os" "testing" + "time" pubnub "github.com/pubnub/go" a "github.com/stretchr/testify/assert" @@ -46,7 +47,9 @@ func TestFetch(t *testing.T) { timestamp2 = GetTimetoken(pn) } pn.Publish().Channel(ch1).Message(fmt.Sprintf("testch1 %d", i)).Execute() + time.Sleep(1 * time.Second) pn.Publish().Channel(ch2).Message(fmt.Sprintf("testch2 %d", i)).Execute() + time.Sleep(1 * time.Second) } timestamp3 := GetTimetoken(pn) diff --git a/tests/e2e/message_counts_test.go b/tests/e2e/message_counts_test.go new file mode 100644 index 00000000..ee5c686a --- /dev/null +++ b/tests/e2e/message_counts_test.go @@ -0,0 +1,92 @@ +package e2e + +import ( + "fmt" + //"log" + //"os" + "strconv" + "testing" + "time" + + pubnub "github.com/pubnub/go" + a "github.com/stretchr/testify/assert" +) + +func MatchMessageCounts(ret *pubnub.MessageCountsResponse, count1, count2 int, ch1, ch2 string, assert *a.Assertions) { + for ch, v := range ret.Channels { + if ch == ch1 { + assert.Equal(count1, v) + } + if ch == ch2 { + assert.Equal(count2, v) + } + + } +} + +func TestMessageCounts(t *testing.T) { + assert := a.New(t) + + pn := pubnub.NewPubNub(pamConfigCopy()) + //pn.Config.Log = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) + + r := GenRandom() + ch1 := fmt.Sprintf("testChannel_sub_%d", r.Intn(99999)) + ch2 := fmt.Sprintf("testChannel_sub_%d", r.Intn(99999)) + + timestamp1 := GetTimetoken(pn) + timestamp2 := int64(0) + + for i := 0; i < 10; i++ { + if i == 5 { + timestamp2 = GetTimetoken(pn) + } + + pn.Publish().Channel(ch1).Message(fmt.Sprintf("testch1 %d", i)).Execute() + time.Sleep(1 * time.Second) + if i < 6 { + pn.Publish().Channel(ch2).Message(fmt.Sprintf("testch2 %d", i)).Execute() + time.Sleep(1 * time.Second) + } + + } + + timestamp3 := GetTimetoken(pn) + fmt.Println("here", strconv.FormatInt(timestamp2, 10), strconv.FormatInt(timestamp3, 10)) + + ret, s, err := pn.MessageCounts(). + Channels([]string{ch1, ch2}). + ChannelsTimetoken([]string{strconv.FormatInt(timestamp2, 10), strconv.FormatInt(timestamp3, 10)}). + Execute() + + fmt.Println("s", s) + fmt.Println("s.StatusCode", s.StatusCode) + + assert.Nil(err) + MatchMessageCounts(ret, 5, 0, ch1, ch2, assert) + + queryParam := map[string]string{ + "q1": "v1", + "q2": "v2", + } + + ret3, _, err := pn.MessageCounts(). + Channels([]string{ch1, ch2}). + Timetoken(strconv.FormatInt(timestamp1, 10)). + QueryParam(queryParam). + Execute() + + MatchMessageCounts(ret3, 10, 6, ch1, ch2, assert) + assert.Nil(err) + + ret1, _, err1 := pn.MessageCountsWithContext(backgroundContext). + Channels([]string{ch1, ch2}). + Timetoken(strconv.FormatInt(timestamp2, 10)). + QueryParam(queryParam). + Execute() + + assert.Nil(err1) + + MatchMessageCounts(ret1, 5, 1, ch1, ch2, assert) + +}