diff --git a/config/logs/source.go b/config/logs/source.go index 28d1d3fb..7ad50ad0 100644 --- a/config/logs/source.go +++ b/config/logs/source.go @@ -48,6 +48,8 @@ type LogSource struct { // LatencyStats tracks internal stats on the time spent by messages from this source in a processing pipeline, i.e. // the duration between when a message is decoded by the tailer/listener/decoder and when the message is handled by a sender LatencyStats *StatsTracker + // Determine if it is a containerd CRI environment, if yes , containerdFlg is Y + containerdFlg string } // NewLogSource creates a new log source. @@ -104,6 +106,20 @@ func (s *LogSource) GetSourceType() SourceType { return s.sourceType } +// SetcontainerdFlg sets a format that give information on Determine if it is a containerd CRI environment, if yes , containerdFlg is Y +func (s *LogSource) SetcontainerdFlg(containerdFlg string) { + s.lock.Lock() + s.containerdFlg = containerdFlg + s.lock.Unlock() +} + +// GetcontainerdFlg returns the containerdFlg used by this source +func (s *LogSource) GetcontainerdFlg() string { + s.lock.Lock() + defer s.lock.Unlock() + return s.containerdFlg +} + // RegisterInfo registers some info to display on the status page func (s *LogSource) RegisterInfo(i InfoProvider) { s.lock.Lock() diff --git a/logs/input/file/tailer.go b/logs/input/file/tailer.go index 1d31e507..fb7d0e19 100644 --- a/logs/input/file/tailer.go +++ b/logs/input/file/tailer.go @@ -75,8 +75,13 @@ func NewDecoderFromSourceWithPattern(source *logsconfig.LogSource, multiLinePatt switch source.GetSourceType() { // TODO case logsconfig.KubernetesSourceType: - lineParser = kubernetes.JSONParser - matcher = &decoder.NewLineMatcher{} + if source.GetcontainerdFlg() == "Y" { + lineParser = kubernetes.Parser + matcher = &decoder.NewLineMatcher{} + } else { + lineParser = kubernetes.JSONParser + matcher = &decoder.NewLineMatcher{} + } // case logsconfig.DockerSourceType: // lineParser = docker.JSONParser // matcher = &decoder.NewLineMatcher{} diff --git a/logs/input/kubernetes/launcher.go b/logs/input/kubernetes/launcher.go index 52cc0d0f..54e1d470 100644 --- a/logs/input/kubernetes/launcher.go +++ b/logs/input/kubernetes/launcher.go @@ -200,6 +200,10 @@ func (l *Launcher) addSource(svc *service.Service) { // force setting source type to kubernetes source.SetSourceType(logsconfig.KubernetesSourceType) + // Determine whether CRI uses containerd + if len(pod.Status.Containers) > 0 && len(pod.Status.Containers[0].ID) >= 13 && pod.Status.Containers[0].ID[:13] == "containerd://" { + source.SetcontainerdFlg("Y") + } l.sourcesByContainer[svc.GetEntityID()] = source l.sources.AddSource(source) @@ -297,6 +301,7 @@ func buildTags(pod *kubernetes.Pod, container kubernetes.ContainerStatus) []stri fmt.Sprintf("kubernetes.pod_id=%s", pod.Metadata.UID), fmt.Sprintf("kubernetes.pod_name=%s", pod.Metadata.Name), fmt.Sprintf("kubernetes.host=%s", pod.Spec.NodeName), + fmt.Sprintf("kubernetes.pod_ip=%s", pod.Status.PodIP), fmt.Sprintf("kubernetes.container_id=%s", container.ID), fmt.Sprintf("kubernetes.container_name=%s", container.Name), fmt.Sprintf("kubernetes.container_image=%s", container.Image), diff --git a/logs/util/kubernetes/kubelet/kubelet_client.go b/logs/util/kubernetes/kubelet/kubelet_client.go index 3f7fc2ad..299467f4 100644 --- a/logs/util/kubernetes/kubelet/kubelet_client.go +++ b/logs/util/kubernetes/kubelet/kubelet_client.go @@ -22,6 +22,7 @@ import ( "time" coreconfig "flashcat.cloud/categraf/config" + "flashcat.cloud/categraf/logs/util" "flashcat.cloud/categraf/logs/util/kubernetes" ) @@ -134,7 +135,9 @@ func (kc *kubeletClient) query(ctx context.Context, path string) ([]byte, int, e return nil, 0, err } - log.Printf("Successfully queried %s, status code: %d, body len: %d", req.URL.String(), response.StatusCode, len(b)) + if util.Debug() { + log.Printf("Successfully queried %s, status code: %d, body len: %d", req.URL.String(), response.StatusCode, len(b)) + } return b, response.StatusCode, nil }