Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
weiliang-ms committed Dec 2, 2021
1 parent 3483aa0 commit 7b6e6e6
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,9 +1130,11 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// start the kubelet
// kubelet 控制循环服务
go k.Run(podCfg.Updates())

// start the kubelet server
// kubelet api接口服务:apiserver服务调用该地址
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/container/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var ImplicitContainerPrefix string = "implicitly required container "
// This function will return an error if the provided Pod does not have a selfLink,
// but we expect selfLink to be populated at all call sites for the function.
func GenerateContainerRef(pod *v1.Pod, container *v1.Container) (*v1.ObjectReference, error) {
// 返回容器域信息:如第1个initContainer、第2个container
fieldPath, err := fieldPath(pod, container)
if err != nil {
// TODO: figure out intelligent way to refer to containers that we implicitly
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/images/image_gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (im *realImageGCManager) detectImages(detectTime time.Time) (sets.String, e

func (im *realImageGCManager) GarbageCollect() error {
// Get disk usage on disk holding images.
// 调用运行时获取存放镜像的文件系统状态:
fsStats, err := im.statsProvider.ImageFsStats()
if err != nil {
return err
Expand Down Expand Up @@ -300,14 +301,17 @@ func (im *realImageGCManager) GarbageCollect() error {

// If over the max threshold, free enough to place us at the lower threshold.
usagePercent := 100 - int(available*100/capacity)
// available=10G capacity=100G HighThresholdPercent=85% LowThresholdPercent=80%
if usagePercent >= im.policy.HighThresholdPercent {
// amountToFree=5G
amountToFree := capacity*int64(100-im.policy.LowThresholdPercent)/100 - available
klog.Infof("[imageGCManager]: Disk usage on image filesystem is at %d%% which is over the high threshold (%d%%). Trying to free %d bytes down to the low threshold (%d%%).", usagePercent, im.policy.HighThresholdPercent, amountToFree, im.policy.LowThresholdPercent)
freed, err := im.freeSpace(amountToFree, time.Now())
if err != nil {
return err
}

// 判断释放的容量与期望释放的容量
if freed < amountToFree {
err := fmt.Errorf("failed to garbage collect required amount of images. Wanted to free %d bytes, but freed %d bytes", amountToFree, freed)
im.recorder.Eventf(im.nodeRef, v1.EventTypeWarning, events.FreeDiskSpaceFailed, err.Error())
Expand Down Expand Up @@ -367,6 +371,7 @@ func (im *realImageGCManager) freeSpace(bytesToFree int64, freeTime time.Time) (
// Avoid garbage collect the image if the image is not old enough.
// In such a case, the image may have just been pulled down, and will be used by a container right away.

// 检测镜像是否到达回收时间(准备释放镜像时间)
if freeTime.Sub(image.firstDetected) < im.policy.MinAge {
klog.V(5).Infof("Image ID %s has age %v which is less than the policy's minAge of %v, not eligible for garbage collection", image.id, freeTime.Sub(image.firstDetected), im.policy.MinAge)
continue
Expand Down
41 changes: 41 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return err
}

// 当运行时为docker 或 cri-o时,使用cadvisor采集容器指标
kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint)

return nil
Expand Down Expand Up @@ -1288,6 +1289,7 @@ func (kl *Kubelet) setupDataDirs() error {
// StartGarbageCollection starts garbage collection threads.
func (kl *Kubelet) StartGarbageCollection() {
loggedContainerGCFailure := false
// 每分钟执行容器垃圾回收
go wait.Until(func() {
if err := kl.containerGC.GarbageCollect(); err != nil {
klog.Errorf("Container garbage collection failed: %v", err)
Expand All @@ -1310,6 +1312,7 @@ func (kl *Kubelet) StartGarbageCollection() {
return
}

// 每5分钟执行镜像垃圾回收
prevImageGCFailed := false
go wait.Until(func() {
if err := kl.imageManager.GarbageCollect(); err != nil {
Expand Down Expand Up @@ -1346,6 +1349,7 @@ func (kl *Kubelet) initializeModules() error {
servermetrics.Register()

// Setup filesystem directories.
// 创建/var/lib/kubelet/{device-plugins,plugins,...}
if err := kl.setupDataDirs(); err != nil {
return err
}
Expand Down Expand Up @@ -1378,6 +1382,7 @@ func (kl *Kubelet) initializeModules() error {

// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
func (kl *Kubelet) initializeRuntimeDependentModules() {
// 1.启动cAdvisor
if err := kl.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
// TODO(random-liu): Add backoff logic in the babysitter
Expand All @@ -1387,7 +1392,9 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
// ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
kl.StatsProvider.GetCgroupStats("/", true)

// Start container manager.
// 2.启动container manager
node, err := kl.getNodeAnyWay()
if err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
Expand All @@ -1398,17 +1405,23 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.Fatalf("Failed to start ContainerManager %v", err)
}

// 3.启动eviction manager
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)

// container log manager must start after container runtime is up to retrieve information from container runtime
// and inform container to reopen log file after log rotation.

// 4.启动container log manager
kl.containerLogManager.Start()
// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager

// 5.启动plugin manager
klog.V(4).Infof("starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
}
Expand All @@ -1427,6 +1440,7 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}


if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
Expand All @@ -1438,6 +1452,9 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)

// 快速上报节点状态(启动时执行,只执行一次):
// 1.
go kl.fastStatusUpdateOnce()

// start syncing lease
Expand Down Expand Up @@ -2196,6 +2213,7 @@ func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()

// GRPC调用,获取运行时状态
s, err := kl.containerRuntime.Status()
if err != nil {
klog.Errorf("Container runtime sanity check failed: %v", err)
Expand All @@ -2205,6 +2223,9 @@ func (kl *Kubelet) updateRuntimeUp() {
klog.Errorf("Container runtime status is nil")
return
}

// 1.设置运行时状态

// Periodically log the whole runtime status for debugging.
// TODO(random-liu): Consider to send node event when optional
// condition is unmet.
Expand All @@ -2226,7 +2247,9 @@ func (kl *Kubelet) updateRuntimeUp() {
kl.runtimeState.setRuntimeState(err)
return
}

kl.runtimeState.setRuntimeState(nil)
// 2.初始化运行时依赖模块
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
Expand Down Expand Up @@ -2286,6 +2309,23 @@ func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID str
// a runtime update and a node status update. Function returns after one successful node status update.
// Function is executed only during Kubelet start which improves latency to ready node by updating
// pod CIDR, runtime status and node statuses ASAP.
/*
fastStatusUpdateOnce函数启动一个循环,当CIDR被应用时(node.spec.podCIDRs非空且合法),
kubelet进程将检查内部节点索引器缓存(node indexer cache),并尝试立即更新pod CIDR。
在pod CIDR被更新之后,它触发一个运行时更新(updateRuntimeUp)和一个节点状态更新(syncNodeStatus)。
fastStatusUpdateOnce函数只在kubelet启动期间执行,通过更新`pod CIDR`、运行时状态和节点状态的方式,降低节点`Ready`状态的延迟。
上面提到的`node.spec.podCIDRs`(kubectl get node node1 -o yaml)表示当前节点上的Pod可使用的IP范围
当node.spec.podCIDRs=10.233.64.0/24,那么pod可使用的ip范围为:
- 起始IP:10.233.64.0
- 结束IP:10.233.64.255
- 子网掩码:255.255.255.0
- 子网数量:256
*/
func (kl *Kubelet) fastStatusUpdateOnce() {
for {
time.Sleep(100 * time.Millisecond)
Expand All @@ -2295,6 +2335,7 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
continue
}
if len(node.Spec.PodCIDRs) != 0 {
// [10.233.64.0/24] -> 10.233.64.0/24
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {

now := kl.clock.Now()
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
// podCIDR未变 && node节点状态未变
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
for i, mount := range container.VolumeMounts {
// do not mount /etc/hosts if container is already mounting on the path
mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
// 判断pod下卷组是否含有容器所定义的卷
vol, ok := podVolumes[mount.Name]
if !ok || vol.Mounter == nil {
klog.Errorf("Mount cannot be satisfied for container %q, because the volume is missing (ok=%v) or the volume mounter (vol.Mounter) is nil (vol=%+v): %+v", container.Name, ok, vol, mount)
Expand Down Expand Up @@ -218,6 +219,7 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
return nil, cleanupAction, fmt.Errorf("failed to create subPath directory for volumeMount %q of container %q", mount.Name, container.Name)
}
}
// TODO
hostPath, cleanupAction, err = subpather.PrepareSafeSubpath(subpath.Subpath{
VolumeMountIndex: i,
Path: hostPath,
Expand Down Expand Up @@ -261,6 +263,8 @@ func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, h
Propagation: propagation,
})
}


if mountEtcHostsFile {
hostAliases := pod.Spec.HostAliases
hostsMount, err := makeHostsMount(podDir, podIPs, hostName, hostDomain, hostAliases, pod.Spec.HostNetwork)
Expand Down Expand Up @@ -324,6 +328,7 @@ func ensureHostsFile(fileName string, hostIPs []string, hostName, hostDomainName
var hostsFileContent []byte
var err error

// 使用宿主机网络命名空间
if useHostNetwork {
// if Pod is using host network, read hosts file from the node's filesystem.
// `etcHostsPath` references the location of the hosts file on the node.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
}

// Step 2: create the container.
// 生成容器引用,
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
klog.Errorf("Can't make a ref to pod %q, container %v: %v", format.Pod(pod), container.Name, err)
Expand All @@ -165,6 +166,7 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
return s.Message(), ErrCreateContainerConfig
}

// 生成创建容器所需配置:设备列表、volumes列表、环境变量列表、注释列表等
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, podIPs, target)
if cleanupAction != nil {
defer cleanupAction()
Expand Down
8 changes: 7 additions & 1 deletion pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ type podActions struct {

// podSandboxChanged checks whether the spec of the pod is changed and returns
// (changed, new attempt, original sandboxID if exist).
func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, uint32, string) {
func (m *kubeGenericRuntimeManager) podSandboxChanged(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, uint32, string) {


if len(podStatus.SandboxStatuses) == 0 {
klog.V(2).Infof("No sandbox for pod %q can be found. Need to start a new one", format.Pod(pod))
return true, 0, ""
Expand Down Expand Up @@ -526,6 +528,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *ku
}
}

// NextInitContainerToStart字段赋值
if len(pod.Spec.InitContainers) != 0 {
// Pod has init containers, return the first one.
changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
Expand Down Expand Up @@ -678,6 +681,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine
}

// Step 2: Kill the pod if the sandbox has changed.
// 创建流程
if podContainerChanges.KillPod {
if podContainerChanges.CreateSandbox {
klog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))
Expand Down Expand Up @@ -738,6 +742,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine
klog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))
createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))
result.AddSyncResult(createSandboxResult)
// 创建pause容器逻辑
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)
Expand All @@ -751,6 +756,7 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontaine
}
klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))

// 查看pause容器状态
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
ref, referr := ref.GetReference(legacyscheme.Scheme, pod)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,20 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attemp
Uid: podUID,
Attempt: attempt,
},
// 注入标签与注解
Labels: newPodLabels(pod),
Annotations: newPodAnnotations(pod),
}

// dns配置
dnsConfig, err := m.runtimeHelper.GetPodDNS(pod)
if err != nil {
return nil, err
}

podSandboxConfig.DnsConfig = dnsConfig

// 非共享主机网络命名空间下情况下,生成pause容器的主机名
if !kubecontainer.IsHostNetworkPod(pod) {
// TODO: Add domain support in new runtime interface
hostname, _, err := m.runtimeHelper.GeneratePodHostNameAndDomain(pod)
Expand All @@ -103,9 +107,11 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attemp
podSandboxConfig.Hostname = hostname
}

// 日志目录
logDir := BuildPodLogsDirectory(pod.Namespace, pod.Name, pod.UID)
podSandboxConfig.LogDirectory = logDir

// 端口映射
portMappings := []*runtimeapi.PortMapping{}
for _, c := range pod.Spec.Containers {
containerPortMappings := kubecontainer.MakePortMappings(&c)
Expand All @@ -128,6 +134,7 @@ func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *v1.Pod, attemp
podSandboxConfig.PortMappings = portMappings
}

// linux相关配置:安全上下文、selinux、系统调用等
lc, err := m.generatePodSandboxLinuxConfig(pod)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/volume/util/subpath/subpath_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (sp *subpath) SafeMakeDir(subdir string, base string, perm os.FileMode) err
}

func (sp *subpath) PrepareSafeSubpath(subPath Subpath) (newHostPath string, cleanupAction func(), err error) {

//
newHostPath, err = doBindSubPath(sp.mounter, subPath)

// There is no action when the container starts. Bind-mount will be cleaned
Expand Down

0 comments on commit 7b6e6e6

Please sign in to comment.