Skip to content

Commit

Permalink
Merge pull request #948 from jinlinGuan/issue-943
Browse files Browse the repository at this point in the history
feat: add ZeroMQ channel and update MQTT channel
  • Loading branch information
cloudxxx8 authored Nov 18, 2024
2 parents 9c7b7a3 + bbd1843 commit c0ce723
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 21 deletions.
10 changes: 7 additions & 3 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,13 @@ const (
// Constants for Address
const (
// Type
REST = "REST"
MQTT = "MQTT"
EMAIL = "EMAIL"
REST = "REST"
MQTT = "MQTT"
EMAIL = "EMAIL"
ZeroMQ = "ZeroMQ"
HTTP = "http"
TCP = "tcp"
TCPS = "tcps"
)

// Constants for SMA Operation Action
Expand Down
94 changes: 82 additions & 12 deletions dtos/address.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2021 IOTech Ltd
// Copyright (C) 2021-2024 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

Expand All @@ -12,14 +12,18 @@ import (
)

type Address struct {
Type string `json:"type" validate:"oneof='REST' 'MQTT' 'EMAIL'"`
Type string `json:"type" validate:"oneof='REST' 'MQTT' 'EMAIL' 'ZeroMQ'"`

Host string `json:"host" validate:"required_unless=Type EMAIL"`
Port int `json:"port" validate:"required_unless=Type EMAIL"`
Scheme string `json:"scheme,omitempty"`
Host string `json:"host,omitempty" validate:"required_unless=Type EMAIL"`
Port int `json:"port,omitempty" validate:"required_unless=Type EMAIL"`

RESTAddress `json:",inline" validate:"-"`
MQTTPubAddress `json:",inline" validate:"-"`
EmailAddress `json:",inline" validate:"-"`
ZeroMQAddress `json:",inline" validate:"-"`
MessageBus `json:",inline" validate:"-"`
Security `json:",inline" validate:"-"`
}

// Validate satisfies the Validator interface
Expand All @@ -39,6 +43,15 @@ func (a *Address) Validate() error {
if err != nil {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "invalid MQTTPubAddress.", err)
}
err = common.Validate(a.MessageBus)
if err != nil {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "invalid MQTTPubAddress.", err)
}
case common.ZeroMQ:
err = common.Validate(a.MessageBus)
if err != nil {
return errors.NewCommonEdgeX(errors.KindContractInvalid, "invalid ZeroMQAddress.", err)
}
case common.EMAIL:
err = common.Validate(a.EmailAddress)
if err != nil {
Expand Down Expand Up @@ -68,7 +81,6 @@ func NewRESTAddress(host string, port int, httpMethod string) Address {

type MQTTPubAddress struct {
Publisher string `json:"publisher,omitempty" validate:"required"`
Topic string `json:"topic,omitempty" validate:"required"`
QoS int `json:"qos,omitempty"`
KeepAlive int `json:"keepAlive,omitempty"`
Retained bool `json:"retained,omitempty"`
Expand All @@ -83,7 +95,25 @@ func NewMQTTAddress(host string, port int, publisher string, topic string) Addre
Port: port,
MQTTPubAddress: MQTTPubAddress{
Publisher: publisher,
Topic: topic,
},
MessageBus: MessageBus{Topic: topic},
}
}

func NewMQTTAddressWithSecurity(scheme string, host string, port int, publisher string, topic string, authMode string, secretPath string, skipCertVerify bool) Address {
return Address{
Type: common.MQTT,
Scheme: scheme,
Host: host,
Port: port,
MQTTPubAddress: MQTTPubAddress{
Publisher: publisher,
},
MessageBus: MessageBus{Topic: topic},
Security: Security{
AuthMode: authMode,
SecretPath: secretPath,
SkipCertVerify: skipCertVerify,
},
}
}
Expand All @@ -101,6 +131,26 @@ func NewEmailAddress(recipients []string) Address {
}
}

type MessageBus struct {
Topic string `json:"topic,omitempty" validate:"required"`
}

type Security struct {
SecretPath string `json:"secretPath,omitempty" validate:"required"`
AuthMode string `json:"authMode,omitempty" validate:"required,oneof='none' 'usernamepassword' 'cacert' 'clientcert'"`
SkipCertVerify bool `json:"skipCertVerify,omitempty"`
}

type ZeroMQAddress struct {
}

func NewZeroMQAddress(topic string) Address {
return Address{
Type: common.ZeroMQ,
MessageBus: MessageBus{Topic: topic},
}
}

func ToAddressModel(a Address) models.Address {
var address models.Address

Expand All @@ -117,16 +167,28 @@ func ToAddressModel(a Address) models.Address {
case common.MQTT:
address = models.MQTTPubAddress{
BaseAddress: models.BaseAddress{
Type: a.Type, Host: a.Host, Port: a.Port,
Type: a.Type, Scheme: a.Scheme, Host: a.Host, Port: a.Port,
},
Security: models.Security{
SecretPath: a.SecretPath,
AuthMode: a.AuthMode,
SkipCertVerify: a.SkipCertVerify,
},
MessageBus: models.MessageBus{Topic: a.Topic},
Publisher: a.MQTTPubAddress.Publisher,
Topic: a.MQTTPubAddress.Topic,
QoS: a.QoS,
KeepAlive: a.KeepAlive,
Retained: a.Retained,
AutoReconnect: a.AutoReconnect,
ConnectTimeout: a.ConnectTimeout,
}
case common.ZeroMQ:
address = models.ZeroMQAddress{
BaseAddress: models.BaseAddress{
Type: a.Type, Host: a.Host, Port: a.Port,
},
MessageBus: models.MessageBus{Topic: a.Topic},
}
case common.EMAIL:
address = models.EmailAddress{
BaseAddress: models.BaseAddress{
Expand All @@ -140,9 +202,10 @@ func ToAddressModel(a Address) models.Address {

func FromAddressModelToDTO(address models.Address) Address {
dto := Address{
Type: address.GetBaseAddress().Type,
Host: address.GetBaseAddress().Host,
Port: address.GetBaseAddress().Port,
Type: address.GetBaseAddress().Type,
Scheme: address.GetBaseAddress().Scheme,
Host: address.GetBaseAddress().Host,
Port: address.GetBaseAddress().Port,
}

switch a := address.(type) {
Expand All @@ -155,13 +218,20 @@ func FromAddressModelToDTO(address models.Address) Address {
case models.MQTTPubAddress:
dto.MQTTPubAddress = MQTTPubAddress{
Publisher: a.Publisher,
Topic: a.Topic,
QoS: a.QoS,
KeepAlive: a.KeepAlive,
Retained: a.Retained,
AutoReconnect: a.AutoReconnect,
ConnectTimeout: a.ConnectTimeout,
}
dto.MessageBus = MessageBus{Topic: a.Topic}
dto.Security = Security{
SecretPath: a.SecretPath,
AuthMode: a.AuthMode,
SkipCertVerify: a.SkipCertVerify,
}
case models.ZeroMQAddress:
dto.MessageBus = MessageBus{Topic: a.Topic}
case models.EmailAddress:
dto.EmailAddress = EmailAddress{
Recipients: a.Recipients,
Expand Down
8 changes: 5 additions & 3 deletions dtos/address_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ var testMQTTPubAddress = Address{
Port: testPort,
MQTTPubAddress: MQTTPubAddress{
Publisher: testPublisher,
Topic: testTopic,
},
MessageBus: MessageBus{Topic: testTopic},
}

var testEmailAddress = Address{
Expand Down Expand Up @@ -195,7 +195,9 @@ func TestAddress_marshalJSON(t *testing.T) {
Host: testHost, Port: testPort,
MQTTPubAddress: MQTTPubAddress{
Publisher: testPublisher,
Topic: testTopic,
},
MessageBus: MessageBus{
testTopic,
},
}
expectedMQTTJsonStr := fmt.Sprintf(
Expand All @@ -209,7 +211,7 @@ func TestAddress_marshalJSON(t *testing.T) {
},
}
expectedEmailJsonStr := fmt.Sprintf(
`{"type":"%s","host":"","port":0,"recipients":["%s"]}`,
`{"type":"%s","recipients":["%s"]}`,
emailAddress.Type, emailAddress.Recipients[0],
)

Expand Down
40 changes: 37 additions & 3 deletions models/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func unmarshalAddress(b []byte) (address Address, err error) {
return address, errors.NewCommonEdgeX(errors.KindContractInvalid, "Failed to unmarshal Email address.", err)
}
address = mail
case common.ZeroMQ:
var zeromq ZeroMQAddress
if err = json.Unmarshal(b, &zeromq); err != nil {
return address, errors.NewCommonEdgeX(errors.KindContractInvalid, "Failed to unmarshal ZeroMQ address.", err)
}
address = zeromq
default:
return address, errors.NewCommonEdgeX(errors.KindContractInvalid, "Unsupported address type", err)
}
Expand All @@ -63,8 +69,27 @@ type BaseAddress struct {
Type string

// Common properties
Host string
Port int
Scheme string // Scheme indicates the scheme of the URI, see https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Syntax
Host string
Port int
}

// Security is a base struct contains the security related fields.
type Security struct {
// SecretPath is the name of the path in secret provider to retrieve your secrets. Must be non-blank.
SecretPath string
// AuthMode indicates what to use when connecting to the broker.
// Options are "none", "cacert" , "usernamepassword", "clientcert".
// If a CA Cert exists in the SecretPath then it will be used for
// all modes except "none".
AuthMode string
// SkipCertVerify indicates if the server certificate verification should be skipped
SkipCertVerify bool
}

// MessageBus is a base struct contains the messageBus related fields.
type MessageBus struct {
Topic string
}

// RESTAddress is a REST specific struct
Expand All @@ -80,8 +105,9 @@ func (a RESTAddress) GetBaseAddress() BaseAddress { return a.BaseAddress }
// MQTTPubAddress is a MQTT specific struct
type MQTTPubAddress struct {
BaseAddress
MessageBus
Security
Publisher string
Topic string
QoS int
KeepAlive int
Retained bool
Expand All @@ -98,3 +124,11 @@ type EmailAddress struct {
}

func (a EmailAddress) GetBaseAddress() BaseAddress { return a.BaseAddress }

// ZeroMQAddress is a ZeroMQ specific struct
type ZeroMQAddress struct {
BaseAddress
MessageBus
}

func (a ZeroMQAddress) GetBaseAddress() BaseAddress { return a.BaseAddress }

0 comments on commit c0ce723

Please sign in to comment.