Skip to content

Commit

Permalink
add CLIENT/SERVER draft sockets (zeromq#274)
Browse files Browse the repository at this point in the history
Co-authored-by: Pavol Vargovcik <[email protected]>
  • Loading branch information
p4l1ly and p4l1ly authored Apr 10, 2020
1 parent e2dbde6 commit 224a2d1
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 0 deletions.
85 changes: 85 additions & 0 deletions sock_draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,38 @@ package goczmq

/*
#include "czmq.h"
int Sock_sendserverframe(
zsock_t *sock,
const void *data,
size_t size,
int flags,
uint32_t routing_id
) {
zframe_t *frame = zframe_new (data, size);
assert(frame != NULL);
zframe_set_routing_id(frame, routing_id);
int rc = zframe_send (&frame, sock, flags);
return rc;
}
*/
import "C"

import (
"unsafe"
)

const (
// Scatter is a ZMQ_SCATTER socket type
Scatter = int(C.ZMQ_SCATTER)

// Gather is a ZMQ_GATHER socket type
Gather = int(C.ZMQ_GATHER)

// Client is a ZMQ_CLIENT socket type
Client = int(C.ZMQ_CLIENT)

// Gather is a ZMQ_SERVER socket type
Server = int(C.ZMQ_SERVER)
)

// NewGather creates a Gather socket and calls Attach.
Expand All @@ -28,3 +51,65 @@ func NewScatter(endpoints string) (*Sock, error) {
s := NewSock(Scatter)
return s, s.Attach(endpoints, false)
}

// NewServer creates a Server socket and calls Attach.
// The socket will Bind by default.
func NewServer(endpoints string) (*Sock, error) {
s := NewSock(Server)
return s, s.Attach(endpoints, true)
}

// NewClient creates a Client socket and calls Attach.
// The socket will Connect by default.
func NewClient(endpoints string) (*Sock, error) {
s := NewSock(Client)
return s, s.Attach(endpoints, false)
}

// RecvServerFrame reads a frame from the socket and returns it
// as a byte array, along with a more flag, routing ID and error
// (if there is an error)
func (s *Sock) RecvServerFrame() ([]byte, uint32, error) {
if s.zsockT == nil {
return nil, 0, ErrRecvFrameAfterDestroy
}

frame := C.zframe_recv(unsafe.Pointer(s.zsockT))
if frame == nil {
return []byte{0}, 0, ErrRecvFrame
}
dataSize := C.zframe_size(frame)
dataPtr := C.zframe_data(frame)
b := C.GoBytes(unsafe.Pointer(dataPtr), C.int(dataSize))
var routing_id C.uint32_t = C.zframe_routing_id(frame)
C.zframe_destroy(&frame)
return b, uint32(routing_id), nil
}

// SendFrame sends a byte array via the socket. For the flags
// value, use FlagNone (0) for a single message, or FlagMore if it is
// a multi-part message
func (s *Sock) SendServerFrame(data []byte, routing_id uint32) error {
var rc C.int
if len(data) == 0 {
rc = C.Sock_sendserverframe(
s.zsockT,
nil,
C.size_t(0),
C.int(FlagNone),
C.uint32_t(routing_id),
)
} else {
rc = C.Sock_sendserverframe(
s.zsockT,
unsafe.Pointer(&data[0]),
C.size_t(len(data)),
C.int(FlagNone),
C.uint32_t(routing_id),
)
}
if rc == C.int(-1) {
return ErrSendFrame
}
return nil
}
55 changes: 55 additions & 0 deletions sock_draft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,58 @@ func benchmarkScatterGather(size int, b *testing.B) {
func BenchmarkScatterGather1k(b *testing.B) { benchmarkScatterGather(1024, b) }
func BenchmarkScatterGather4k(b *testing.B) { benchmarkScatterGather(4096, b) }
func BenchmarkScatterGather16k(b *testing.B) { benchmarkScatterGather(16384, b) }

func TestClientServer(t *testing.T) {
bogusClient, err := NewClient("bogus://bogus")
if err == nil {
t.Error(err)
}
defer bogusClient.Destroy()

bogusServer, err := NewServer("bogus://bogus")
if err == nil {
t.Error(err)
}
defer bogusServer.Destroy()

client, err := NewClient("inproc://server")
if err != nil {
t.Error(err)
}
defer client.Destroy()

server, err := NewServer("inproc://server")
if err != nil {
t.Error(err)
}
defer server.Destroy()

err = client.SendFrame([]byte("Hello World"), FlagNone)
if err != nil {
t.Error(err)
}

frame, routing_id, err := server.RecvServerFrame()
if err != nil {
t.Error(err)
}
t.Logf("routing_id %d", routing_id)

if want, have := "Hello World", string(frame); want != have {
t.Errorf("want %#v, have %#v", want, have)
}

err = server.SendServerFrame([]byte("Hi World"), routing_id)
if err != nil {
t.Error(err)
}

frame, _, err = client.RecvFrame()
if err != nil {
t.Error(err)
}

if want, have := "Hi World", string(frame); want != have {
t.Errorf("want %#v, have %#v", want, have)
}
}

0 comments on commit 224a2d1

Please sign in to comment.