Skip to content

Commit

Permalink
修复kafka空指针的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed Apr 3, 2023
2 parents 587a9e6 + a55c106 commit 81f2e4d
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
8 changes: 4 additions & 4 deletions drivers/output/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type Config struct {
Topic string `json:"topic" yaml:"topic" label:"Topic"`
Address string `json:"address" yaml:"address" label:"请求地址"`
Timeout int `json:"timeout" yaml:"timeout" label:"超时时间"`
Version string `json:"version" yaml:"version" label:"版本"`
PartitionType string `json:"partition_type" yaml:"partition_type"`
Partition int32 `json:"partition" yaml:"partition"`
PartitionKey string `json:"partition_key" yaml:"partition_key"`
Version string `json:"version" yaml:"version" label:"版本" default:"1.0.0.0" enum:"0.8.2.0, 0.8.2.1, 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, 0.11.0.2, 1.0.0.0, 1.0.1.0, 1.0.2.0, 1.1.0.0, 1.1.1.0, 2.0.0.0, 2.0.1.0, 2.1.0.0, 2.1.1.0, 2.2.0.0, 2.2.1.0, 2.2.2.0, 2.3.0.0, 2.3.1.0, 2.4.0.0, 2.4.1.0, 2.5.0.0, 2.5.1.0, 2.6.0.0, 2.6.1.0, 2.6.2.0, 2.7.0.0, 2.7.1.0, 2.8.0.0, 2.8.1.0, 3.0.0.0, 3.1.0.0"`
PartitionType string `json:"partition_type" yaml:"partition_type" enum:"robin,hash,manual,random"`
Partition int32 `json:"partition" yaml:"partition" switch:"partition_type==='manual'"`
PartitionKey string `json:"partition_key" yaml:"partition_key" switch:"partition_type==='hash'"`
Type string `json:"type" yaml:"type" enum:"json,line" label:"输出格式"`
Formatter eosc.FormatterConfig `json:"formatter" yaml:"formatter" label:"格式化配置"`
}
Expand Down
5 changes: 2 additions & 3 deletions drivers/output/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type tProducer struct {
conf *ProducerConfig
cancel context.CancelFunc
enable bool
locker *sync.Mutex
formatter eosc.IFormatter
}

Expand Down Expand Up @@ -101,9 +100,9 @@ func (o *tProducer) write(msg *sarama.ProducerMessage) {
if !o.enable {
return
}
o.locker.Lock()

o.input <- msg
o.locker.Unlock()

}

func (o *tProducer) work() {
Expand Down
2 changes: 1 addition & 1 deletion drivers/plugins/proxy-mirror/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Config struct {
Addr string `json:"Addr" label:"服务地址" description:"镜像服务地址, 需要包含scheme"`
Addr string `json:"addr" label:"服务地址" description:"镜像服务地址, 需要包含scheme"`
SampleConf *SampleConfig `json:"sample_conf" label:"采样配置"`
Timeout int `json:"timeout" label:"请求超时时间"`
PassHost string `json:"pass_host" enum:"pass,node,rewrite" default:"pass" label:"转发域名" description:"请求发给上游时的 host 设置选型,pass:将客户端的 host 透传给上游,node:使用addr中配置的host,rewrite:使用下面指定的host值"`
Expand Down

0 comments on commit 81f2e4d

Please sign in to comment.