From bc52affc8bf3f38cf05319d0c913a244fee7a04f Mon Sep 17 00:00:00 2001 From: Meepoljdx <278914323@qq.com> Date: Fri, 24 Nov 2023 18:01:17 +0800 Subject: [PATCH 1/2] add timeout config for ntp input plugin --- conf/input.ntp/ntp.toml | 3 ++ inputs/ntp/ntp.go | 4 +- inputs/ntp/ntp_test.go | 24 +++++++++++ inputs/ntp/nux.go | 88 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+), 2 deletions(-) create mode 100644 inputs/ntp/ntp_test.go create mode 100644 inputs/ntp/nux.go diff --git a/conf/input.ntp/ntp.toml b/conf/input.ntp/ntp.toml index 2bd00eb6..a3329696 100644 --- a/conf/input.ntp/ntp.toml +++ b/conf/input.ntp/ntp.toml @@ -3,3 +3,6 @@ # # ntp servers # ntp_servers = ["ntp.aliyun.com"] + +# # response time out seconds +# timeout = 5 diff --git a/inputs/ntp/ntp.go b/inputs/ntp/ntp.go index 0aaf1af4..5f5b95a6 100644 --- a/inputs/ntp/ntp.go +++ b/inputs/ntp/ntp.go @@ -7,7 +7,6 @@ import ( "flashcat.cloud/categraf/config" "flashcat.cloud/categraf/inputs" "flashcat.cloud/categraf/types" - "github.com/toolkits/pkg/nux" ) const inputName = "ntp" @@ -15,6 +14,7 @@ const inputName = "ntp" type NTPStat struct { config.PluginConfig NTPServers []string `toml:"ntp_servers"` + TimeOut int64 `toml:"timeout"` server string } @@ -46,7 +46,7 @@ func (n *NTPStat) Gather(slist *types.SampleList) { } orgTime := time.Now() - serverReciveTime, serverTransmitTime, err := nux.NtpTwoTime(n.server) + serverReciveTime, serverTransmitTime, err := getTwoTime(n.server, 4, n.TimeOut) if err != nil { log.Println("E! failed to connect ntp server:", n.server, "error:", err) n.server = "" diff --git a/inputs/ntp/ntp_test.go b/inputs/ntp/ntp_test.go new file mode 100644 index 00000000..2efd1fcb --- /dev/null +++ b/inputs/ntp/ntp_test.go @@ -0,0 +1,24 @@ +package ntp + +import ( + "log" + "testing" + "time" +) + +func TestGetTwoTime(t *testing.T) { + orgTime := time.Now() + log.Println("Begin") + serverReciveTime, serverTransmitTime, err := getTwoTime("ntp1.aliyun.com", 4, 20) + if err != nil { + log.Println(err) + return + } + dstTime := time.Now() + + // https://en.wikipedia.org/wiki/Network_Time_Protocol + duration := ((serverReciveTime.UnixNano() - orgTime.UnixNano()) + (serverTransmitTime.UnixNano() - dstTime.UnixNano())) / 2 + + delta := duration / 1e6 // convert to ms + log.Println(delta) +} diff --git a/inputs/ntp/nux.go b/inputs/ntp/nux.go new file mode 100644 index 00000000..ba530ccc --- /dev/null +++ b/inputs/ntp/nux.go @@ -0,0 +1,88 @@ +package ntp + +import ( + "encoding/binary" + "net" + "time" +) + +type mode byte + +const ( + reserved mode = 0 + iota + symmetricActive + symmetricPassive + client + server + broadcast + controlMessage + reservedPrivate +) + +type ntpTime struct { + Seconds uint32 + Fraction uint32 +} + +func (t ntpTime) UTC() time.Time { + nsec := uint64(t.Seconds)*1e9 + (uint64(t.Fraction) * 1e9 >> 32) + return time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC).Add(time.Duration(nsec)) +} + +type msg struct { + LiVnMode byte // Leap Indicator (2) + Version (3) + Mode (3) + Stratum byte + Poll byte + Precision byte + RootDelay uint32 + RootDispersion uint32 + ReferenceId uint32 + ReferenceTime ntpTime + OriginTime ntpTime + ReceiveTime ntpTime + TransmitTime ntpTime +} + +func (m *msg) SetVersion(v byte) { + m.LiVnMode = (m.LiVnMode & 0xc7) | v<<3 +} + +// SetMode sets the NTP protocol mode on the message. +func (m *msg) SetMode(md mode) { + m.LiVnMode = (m.LiVnMode & 0xf8) | byte(md) +} + +func getTwoTime(host string, version byte, timeout int64) (time.Time, time.Time, error) { + if version < 2 || version > 4 { + panic("ntp: invalid version number") + } + + raddr, err := net.ResolveUDPAddr("udp", host+":123") + if err != nil { + return time.Now(), time.Now(), err + } + + con, err := net.DialUDP("udp", nil, raddr) + if err != nil { + return time.Now(), time.Now(), err + } + defer con.Close() + con.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second)) + m := new(msg) + m.SetMode(client) + m.SetVersion(version) + + err = binary.Write(con, binary.BigEndian, m) + if err != nil { + return time.Now(), time.Now(), err + } + + err = binary.Read(con, binary.BigEndian, m) + if err != nil { + return time.Now(), time.Now(), err + } + + t := m.ReceiveTime.UTC().Local() + transmitTime := m.TransmitTime.UTC().Local() + return t, transmitTime, nil +} From 27b271965e32b949791def988f5e0cf2ae3fa1c5 Mon Sep 17 00:00:00 2001 From: Meepoljdx <278914323@qq.com> Date: Thu, 30 Nov 2023 18:02:51 +0800 Subject: [PATCH 2/2] Update github.com/toolkits/pkg to v.1.3.7 to use ntp timeout param --- go.mod | 2 +- inputs/ntp/ntp.go | 9 ++++- inputs/ntp/ntp_test.go | 4 +- inputs/ntp/nux.go | 88 ------------------------------------------ 4 files changed, 11 insertions(+), 92 deletions(-) delete mode 100644 inputs/ntp/nux.go diff --git a/go.mod b/go.mod index e6b9066c..85191795 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/shirou/gopsutil/v3 v3.22.5 github.com/sirupsen/logrus v1.8.1 // indirect github.com/stretchr/testify v1.8.3 - github.com/toolkits/pkg v1.3.5 + github.com/toolkits/pkg v1.3.7 github.com/ulricqin/gosnmp v0.0.1 github.com/xdg/scram v1.0.5 go.mongodb.org/mongo-driver v1.9.1 diff --git a/inputs/ntp/ntp.go b/inputs/ntp/ntp.go index 5f5b95a6..95914c49 100644 --- a/inputs/ntp/ntp.go +++ b/inputs/ntp/ntp.go @@ -7,6 +7,8 @@ import ( "flashcat.cloud/categraf/config" "flashcat.cloud/categraf/inputs" "flashcat.cloud/categraf/types" + + "github.com/toolkits/pkg/nux" ) const inputName = "ntp" @@ -20,7 +22,10 @@ type NTPStat struct { func init() { inputs.Add(inputName, func() inputs.Input { - return &NTPStat{} + return &NTPStat{ + // default timeout is 5 seconds + TimeOut: 5, + } }) } @@ -46,7 +51,7 @@ func (n *NTPStat) Gather(slist *types.SampleList) { } orgTime := time.Now() - serverReciveTime, serverTransmitTime, err := getTwoTime(n.server, 4, n.TimeOut) + serverReciveTime, serverTransmitTime, err := nux.NtpTwoTime(n.server, n.TimeOut) if err != nil { log.Println("E! failed to connect ntp server:", n.server, "error:", err) n.server = "" diff --git a/inputs/ntp/ntp_test.go b/inputs/ntp/ntp_test.go index 2efd1fcb..c1f15043 100644 --- a/inputs/ntp/ntp_test.go +++ b/inputs/ntp/ntp_test.go @@ -4,12 +4,14 @@ import ( "log" "testing" "time" + + "github.com/toolkits/pkg/nux" ) func TestGetTwoTime(t *testing.T) { orgTime := time.Now() log.Println("Begin") - serverReciveTime, serverTransmitTime, err := getTwoTime("ntp1.aliyun.com", 4, 20) + serverReciveTime, serverTransmitTime, err := nux.NtpTwoTime("ntp1.aliyun.com", 20) if err != nil { log.Println(err) return diff --git a/inputs/ntp/nux.go b/inputs/ntp/nux.go deleted file mode 100644 index ba530ccc..00000000 --- a/inputs/ntp/nux.go +++ /dev/null @@ -1,88 +0,0 @@ -package ntp - -import ( - "encoding/binary" - "net" - "time" -) - -type mode byte - -const ( - reserved mode = 0 + iota - symmetricActive - symmetricPassive - client - server - broadcast - controlMessage - reservedPrivate -) - -type ntpTime struct { - Seconds uint32 - Fraction uint32 -} - -func (t ntpTime) UTC() time.Time { - nsec := uint64(t.Seconds)*1e9 + (uint64(t.Fraction) * 1e9 >> 32) - return time.Date(1900, 1, 1, 0, 0, 0, 0, time.UTC).Add(time.Duration(nsec)) -} - -type msg struct { - LiVnMode byte // Leap Indicator (2) + Version (3) + Mode (3) - Stratum byte - Poll byte - Precision byte - RootDelay uint32 - RootDispersion uint32 - ReferenceId uint32 - ReferenceTime ntpTime - OriginTime ntpTime - ReceiveTime ntpTime - TransmitTime ntpTime -} - -func (m *msg) SetVersion(v byte) { - m.LiVnMode = (m.LiVnMode & 0xc7) | v<<3 -} - -// SetMode sets the NTP protocol mode on the message. -func (m *msg) SetMode(md mode) { - m.LiVnMode = (m.LiVnMode & 0xf8) | byte(md) -} - -func getTwoTime(host string, version byte, timeout int64) (time.Time, time.Time, error) { - if version < 2 || version > 4 { - panic("ntp: invalid version number") - } - - raddr, err := net.ResolveUDPAddr("udp", host+":123") - if err != nil { - return time.Now(), time.Now(), err - } - - con, err := net.DialUDP("udp", nil, raddr) - if err != nil { - return time.Now(), time.Now(), err - } - defer con.Close() - con.SetDeadline(time.Now().Add(time.Duration(timeout) * time.Second)) - m := new(msg) - m.SetMode(client) - m.SetVersion(version) - - err = binary.Write(con, binary.BigEndian, m) - if err != nil { - return time.Now(), time.Now(), err - } - - err = binary.Read(con, binary.BigEndian, m) - if err != nil { - return time.Now(), time.Now(), err - } - - t := m.ReceiveTime.UTC().Local() - transmitTime := m.TransmitTime.UTC().Local() - return t, transmitTime, nil -}