From b165d75dde79604c1b7865f1dbc9b6d9ef4d0420 Mon Sep 17 00:00:00 2001 From: fisherxu Date: Mon, 31 May 2021 22:18:04 +0800 Subject: [PATCH] sync from kubeedge/kubedge repo Signed-off-by: fisherxu --- OWNERS | 1 + pkg/common/util/conn.go | 11 ++++---- pkg/core/context/context_channel.go | 39 ++++++++++++--------------- pkg/core/core.go | 23 ++++++++-------- pkg/core/model/message.go | 42 ++++++++++++++++++++--------- 5 files changed, 64 insertions(+), 52 deletions(-) diff --git a/OWNERS b/OWNERS index 42e5dc9aa..04c130b39 100644 --- a/OWNERS +++ b/OWNERS @@ -14,3 +14,4 @@ reviewers: - kadisi - subpathdev - anyushun + - Iceber diff --git a/pkg/common/util/conn.go b/pkg/common/util/conn.go index 25312aa87..697e6c74a 100644 --- a/pkg/common/util/conn.go +++ b/pkg/common/util/conn.go @@ -15,13 +15,12 @@ type UnixSocket struct { } // NewUnixSocket create new socket -func NewUnixSocket(filename string, size ...int) *UnixSocket { - size1 := 10480 - if size != nil { - size1 = size[0] +func NewUnixSocket(filename string, sizes ...int) *UnixSocket { + size := 10480 + if len(sizes) != 0 { + size = sizes[0] } - us := UnixSocket{filename: filename, bufsize: size1} - return &us + return &UnixSocket{filename: filename, bufsize: size} } func (us *UnixSocket) createServer() { diff --git a/pkg/core/context/context_channel.go b/pkg/core/context/context_channel.go index dd6a660c8..ca1c939c4 100644 --- a/pkg/core/context/context_channel.go +++ b/pkg/core/context/context_channel.go @@ -160,22 +160,19 @@ func (ctx *ChannelContext) SendResp(message model.Message) { // SendToGroup send msg to modules. Todo: do not stuck func (ctx *ChannelContext) SendToGroup(moduleType string, message model.Message) { - // avoid exception because of channel closing - // TODO: need reconstruction - defer func() { - if exception := recover(); exception != nil { - klog.Warningf("Recover when sendToGroup message, exception: %+v", exception) - } - }() - send := func(ch chan model.Message) { + // avoid exception because of channel closing + // TODO: need reconstruction + defer func() { + if exception := recover(); exception != nil { + klog.Warningf("Recover when sendToGroup message, exception: %+v", exception) + } + }() select { case ch <- message: default: klog.Warningf("the message channel is full, message: %+v", message) - select { - case ch <- message: - } + ch <- message } } if channelList := ctx.getTypeChannel(moduleType); channelList != nil { @@ -190,14 +187,6 @@ func (ctx *ChannelContext) SendToGroup(moduleType string, message model.Message) // SendToGroupSync : broadcast the message to echo module channel, the module send response back anon channel // check timeout and the size of anon channel func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error { - // avoid exception because of channel closing - // TODO: need reconstruction - defer func() { - if exception := recover(); exception != nil { - klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception) - } - }() - if timeout <= 0 { timeout = MessageTimeoutDefault } @@ -242,6 +231,13 @@ func (ctx *ChannelContext) SendToGroupSync(moduleType string, message model.Mess var timeoutCounter int32 send := func(ch chan model.Message) { + // avoid exception because of channel closing + // TODO: need reconstruction + defer func() { + if exception := recover(); exception != nil { + klog.Warningf("Recover when sendToGroupsync message, exception: %+v", exception) + } + }() sendTimer := time.NewTimer(time.Until(deadline)) select { case ch <- message: @@ -308,13 +304,12 @@ func (ctx *ChannelContext) addChannel(module string, moduleCh chan model.Message func (ctx *ChannelContext) delChannel(module string) { // delete module channel from channels map ctx.chsLock.Lock() - _, exist := ctx.channels[module] - if !exist { + if _, exist := ctx.channels[module]; !exist { + ctx.chsLock.Unlock() klog.Warningf("Failed to get channel, module:%s", module) return } delete(ctx.channels, module) - ctx.chsLock.Unlock() // delete module channel from typechannels map diff --git a/pkg/core/core.go b/pkg/core/core.go index 47b6abd22..42d5d7770 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -16,9 +16,9 @@ func StartModules() { modules := GetModules() for name, module := range modules { - //Init the module + // Init the module beehiveContext.AddModule(name) - //Assemble typeChannels for sendToGroup + // Assemble typeChannels for sendToGroup beehiveContext.AddModuleGroup(name, module.Group()) go module.Start() klog.Infof("Starting module %v", name) @@ -30,16 +30,15 @@ func GracefulShutdown() { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT) - select { - case s := <-c: - klog.Infof("Get os signal %v", s.String()) - //Cleanup each modules - beehiveContext.Cancel() - modules := GetModules() - for name, _ := range modules { - klog.Infof("Cleanup module %v", name) - beehiveContext.Cleanup(name) - } + s := <-c + klog.Infof("Get os signal %v", s.String()) + + // Cleanup each modules + beehiveContext.Cancel() + modules := GetModules() + for name := range modules { + klog.Infof("Cleanup module %v", name) + beehiveContext.Cleanup(name) } } diff --git a/pkg/core/model/message.go b/pkg/core/model/message.go index db53c2247..4df3b076b 100644 --- a/pkg/core/model/message.go +++ b/pkg/core/model/message.go @@ -1,6 +1,8 @@ package model import ( + "encoding/json" + "fmt" "time" uuid "github.com/satori/go.uuid" @@ -15,13 +17,16 @@ const ( ResponseOperation = "response" ResponseErrorOperation = "error" - ResourceTypePod = "pod" - ResourceTypeConfigmap = "configmap" - ResourceTypeSecret = "secret" - ResourceTypeNode = "node" - ResourceTypePodlist = "podlist" - ResourceTypePodStatus = "podstatus" - ResourceTypeNodeStatus = "nodestatus" + ResourceTypePod = "pod" + ResourceTypeConfigmap = "configmap" + ResourceTypeSecret = "secret" + ResourceTypeNode = "node" + ResourceTypePodlist = "podlist" + ResourceTypePodStatus = "podstatus" + ResourceTypeNodeStatus = "nodestatus" + ResourceTypeRule = "rule" + ResourceTypeRuleEndpoint = "ruleendpoint" + ResourceTypeRuleStatus = "rulestatus" ) // Message struct @@ -118,27 +123,40 @@ func (msg *Message) GetID() string { return msg.Header.ID } -//GetParentID returns message parent id +// GetParentID returns message parent id func (msg *Message) GetParentID() string { return msg.Header.ParentID } -//GetTimestamp returns message timestamp +// GetTimestamp returns message timestamp func (msg *Message) GetTimestamp() int64 { return msg.Header.Timestamp } -//GetContent returns message content +// GetContent returns message content func (msg *Message) GetContent() interface{} { return msg.Content } -//GetResourceVersion returns message resource version +// GetContentData returns message content data +func (msg *Message) GetContentData() ([]byte, error) { + if data, ok := msg.Content.([]byte); ok { + return data, nil + } + + data, err := json.Marshal(msg.Content) + if err != nil { + return nil, fmt.Errorf("marshal message content failed: %s", err) + } + return data, nil +} + +// GetResourceVersion returns message resource version func (msg *Message) GetResourceVersion() string { return msg.Header.ResourceVersion } -//UpdateID returns message object updating its ID +// UpdateID returns message object updating its ID func (msg *Message) UpdateID() *Message { msg.Header.ID = uuid.NewV4().String() return msg