diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 6d4bf492038f5..c0b71ec00bf26 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -597,6 +597,7 @@ func getReservedCPUs(machineInfo *cadvisorapi.MachineInfo, cpus string) (cpuset. func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) { // Set global feature gates based on the value on the initial KubeletServer + logger := klog.FromContext(ctx) err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates) if err != nil { return err @@ -894,7 +895,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend } if kubeDeps.NodeStartupLatencyTracker == nil { - kubeDeps.NodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker() + kubeDeps.NodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker(logger) } // TODO(vmarmol): Do this through container config. diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 90863541f2714..66da771cc4bc0 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -178,6 +178,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* + contextual k8s.io/kubernetes/pkg/kubelet/util/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/golangci.yaml b/hack/golangci.yaml index f7c834116f35a..b19e20e331838 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -244,6 +244,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* + contextual k8s.io/kubernetes/pkg/kubelet/util/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/hack/logcheck.conf b/hack/logcheck.conf index 82507bd11ae4c..f9b0480bc077a 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -56,6 +56,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/token/.* contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.* contextual k8s.io/kubernetes/pkg/kubelet/oom/.* contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.* +contextual k8s.io/kubernetes/pkg/kubelet/util/.* # As long as contextual logging is alpha or beta, all WithName, WithValues, # NewContext calls have to go through klog. Once it is GA, we can lift diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 434323421eac2..bb3432f22f28d 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -209,7 +209,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err) } - isSwapOn, err := swap.IsSwapOn() + // TODO: it needs to be replaced by a proper context in the future + ctx := context.TODO() + logger := klog.FromContext(ctx) + isSwapOn, err := swap.IsSwapOn(logger) if err != nil { return nil, fmt.Errorf("failed to determine if swap is on: %w", err) } @@ -219,7 +222,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I return nil, fmt.Errorf("running with swap on is not supported, please disable swap or set --fail-swap-on flag to false") } - if !swap.IsTmpfsNoswapOptionSupported(mountUtil, nodeConfig.KubeletRootDir) { + if !swap.IsTmpfsNoswapOptionSupported(logger, mountUtil, nodeConfig.KubeletRootDir) { nodeRef := nodeRefFromNode(string(nodeConfig.NodeName)) recorder.Event(nodeRef, v1.EventTypeWarning, events.PossibleMemoryBackedVolumesOnDisk, "The tmpfs noswap option is not supported. Memory-backed volumes (e.g. secrets, emptyDirs, etc.) "+ diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 34fde1a103809..15fddb501bd5d 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -52,7 +52,7 @@ const ( ) type podStartupSLIObserver interface { - ObservedPodOnWatch(pod *v1.Pod, when time.Time) + ObservedPodOnWatch(logger klog.Logger, pod *v1.Pod, when time.Time) } // PodConfig is a configuration mux that merges many sources of pod configuration into a single @@ -157,12 +157,12 @@ func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificatio // Merge normalizes a set of incoming changes from different sources into a map of all Pods // and ensures that redundant changes are filtered out, and then pushes zero or more minimal // updates onto the update channel. Ensures that updates are delivered in order. -func (s *podStorage) Merge(source string, change interface{}) error { +func (s *podStorage) Merge(logger klog.Logger, source string, change interface{}) error { s.updateLock.Lock() defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source) - adds, updates, deletes, removes, reconciles := s.merge(source, change) + adds, updates, deletes, removes, reconciles := s.merge(logger, source, change) firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications @@ -216,7 +216,7 @@ func (s *podStorage) Merge(source string, change interface{}) error { return nil } -func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { +func (s *podStorage) merge(logger klog.Logger, source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() @@ -244,7 +244,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source // ignore static pods if !kubetypes.IsStaticPod(ref) { - s.startupSLIObserver.ObservedPodOnWatch(ref, time.Now()) + s.startupSLIObserver.ObservedPodOnWatch(logger, ref, time.Now()) } if existing, found := oldPods[ref.UID]; found { pods[ref.UID] = existing diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index b2e652c6c2e26..7e52604a953b3 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/test/utils/ktesting" @@ -64,7 +65,8 @@ func (s sortedPods) Less(i, j int) bool { type mockPodStartupSLIObserver struct{} -func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {} +func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(logger klog.Logger, pod *v1.Pod, when time.Time) { +} func CreateValidPod(name, namespace string) *v1.Pod { return &v1.Pod{ diff --git a/pkg/kubelet/config/mux.go b/pkg/kubelet/config/mux.go index a2b3e1e0a5f68..10c304c0a7cf1 100644 --- a/pkg/kubelet/config/mux.go +++ b/pkg/kubelet/config/mux.go @@ -28,7 +28,7 @@ type merger interface { // Invoked when a change from a source is received. May also function as an incremental // merger if you wish to consume changes incrementally. Must be reentrant when more than // one source is defined. - Merge(source string, update interface{}) error + Merge(logger klog.Logger, source string, update interface{}) error } // mux is a class for merging configuration from multiple sources. Changes are @@ -70,14 +70,16 @@ func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interf newChannel := make(chan interface{}) m.sources[source] = newChannel - go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done()) + logger := klog.FromContext(ctx) + + go wait.Until(func() { m.listen(logger, source, newChannel) }, 0, ctx.Done()) return newChannel } -func (m *mux) listen(source string, listenChannel <-chan interface{}) { +func (m *mux) listen(logger klog.Logger, source string, listenChannel <-chan interface{}) { for update := range listenChannel { - if err := m.merger.Merge(source, update); err != nil { - klog.InfoS("failed merging update", "err", err) + if err := m.merger.Merge(logger, source, update); err != nil { + logger.Info("failed merging update", "err", err) } } } diff --git a/pkg/kubelet/config/mux_test.go b/pkg/kubelet/config/mux_test.go index b24068505cf76..a5bfd7129e072 100644 --- a/pkg/kubelet/config/mux_test.go +++ b/pkg/kubelet/config/mux_test.go @@ -20,6 +20,8 @@ import ( "context" "reflect" "testing" + + "k8s.io/klog/v2" ) func TestConfigurationChannels(t *testing.T) { @@ -43,7 +45,7 @@ type MergeMock struct { t *testing.T } -func (m MergeMock) Merge(source string, update interface{}) error { +func (m MergeMock) Merge(logger klog.Logger, source string, update interface{}) error { if m.source != source { m.t.Errorf("Expected %s, Got %s", m.source, source) } @@ -63,10 +65,10 @@ func TestMergeInvoked(t *testing.T) { } // mergeFunc implements the Merger interface -type mergeFunc func(source string, update interface{}) error +type mergeFunc func(logger klog.Logger, source string, update interface{}) error -func (f mergeFunc) Merge(source string, update interface{}) error { - return f(source, update) +func (f mergeFunc) Merge(logger klog.Logger, source string, update interface{}) error { + return f(logger, source, update) } func TestSimultaneousMerge(t *testing.T) { @@ -74,7 +76,7 @@ func TestSimultaneousMerge(t *testing.T) { defer cancel() ch := make(chan bool, 2) - mux := newMux(mergeFunc(func(source string, update interface{}) error { + mux := newMux(mergeFunc(func(logger klog.Logger, source string, update interface{}) error { switch source { case "one": if update.(string) != "test" { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 59d4158ee8f6e..a1883b4ea277d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -666,7 +666,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker) klet.allocationManager = allocation.NewManager(klet.getRootDir()) - klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) + klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(logger, klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) klet.runtimeService = kubeDeps.RemoteRuntimeService @@ -991,7 +991,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, renewInterval, string(klet.nodeName), v1.NamespaceNodeLease, - util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName))) + util.SetNodeOwnerFunc(ctx, klet.heartbeatClient, string(klet.nodeName))) // setup node shutdown manager shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{ diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f46defb0d032c..b0f2f5c9f4a78 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -275,7 +275,7 @@ func newTestKubeletWithImageList( podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker) kubelet.allocationManager = allocation.NewInMemoryManager() - kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker() + kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker(logger) kubelet.containerRuntime = fakeRuntime kubelet.runtimeCache = containertest.NewFakeRuntimeCache(kubelet.containerRuntime) @@ -302,7 +302,7 @@ func newTestKubeletWithImageList( } volumeStatsAggPeriod := time.Second * 10 - kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.recorder) + kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(logger, kubelet, volumeStatsAggPeriod, kubelet.recorder) fakeHostStatsProvider := stats.NewFakeHostStatsProvider() diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index fe58760dd4e93..5f12b7cb2de15 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -37,6 +37,7 @@ import ( cadvisorapiv2 "github.com/google/cadvisor/info/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/klog/v2" "k8s.io/utils/ptr" v1 "k8s.io/api/core/v1" @@ -53,6 +54,7 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/test/utils/ktesting" // Do some initialization to decode the query parameters correctly. "k8s.io/apiserver/pkg/server/healthz" @@ -325,22 +327,21 @@ type serverTestFramework struct { testHTTPServer *httptest.Server } -func newServerTest() *serverTestFramework { - return newServerTestWithDebug(true, nil) +func newServerTest(logger klog.Logger) *serverTestFramework { + return newServerTestWithDebug(logger, true, nil) } -func newServerTestWithDebug(enableDebugging bool, streamingServer streaming.Server) *serverTestFramework { +func newServerTestWithDebug(logger klog.Logger, enableDebugging bool, streamingServer streaming.Server) *serverTestFramework { kubeCfg := &kubeletconfiginternal.KubeletConfiguration{ EnableDebuggingHandlers: enableDebugging, EnableSystemLogHandler: enableDebugging, EnableProfilingHandler: enableDebugging, EnableDebugFlagsHandler: enableDebugging, } - return newServerTestWithDebuggingHandlers(kubeCfg, streamingServer) + return newServerTestWithDebuggingHandlers(logger, kubeCfg, streamingServer) } -func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletConfiguration, streamingServer streaming.Server) *serverTestFramework { - +func newServerTestWithDebuggingHandlers(logger klog.Logger, kubeCfg *kubeletconfiginternal.KubeletConfiguration, streamingServer streaming.Server) *serverTestFramework { fw := &serverTestFramework{} fw.fakeKubelet = &fakeKubelet{ hostnameFunc: func() string { @@ -371,7 +372,7 @@ func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletCo } server := NewServer( fw.fakeKubelet, - stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), + stats.NewResourceAnalyzer(logger, fw.fakeKubelet, time.Minute, &record.FakeRecorder{}), []healthz.HealthChecker{}, fw.fakeAuth, kubeCfg, @@ -390,7 +391,8 @@ func getPodName(name, namespace string) string { } func TestServeLogs(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() content := string(`
kubelet.loggoogle.log`) @@ -419,7 +421,8 @@ func TestServeLogs(t *testing.T) { } func TestServeRunInContainer(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() output := "foo bar" podNamespace := "other" @@ -460,7 +463,8 @@ func TestServeRunInContainer(t *testing.T) { } func TestServeRunInContainerWithUID(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() output := "foo bar" podNamespace := "other" @@ -503,7 +507,8 @@ func TestServeRunInContainerWithUID(t *testing.T) { } func TestHealthCheck(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() fw.fakeKubelet.hostnameFunc = func() string { return "127.0.0.1" @@ -532,7 +537,8 @@ func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) { // Ensure all registered handlers & services have an associated testcase. func TestAuthzCoverage(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() for _, fineGrained := range []bool{false, true} { @@ -577,7 +583,8 @@ func TestAuthzCoverage(t *testing.T) { } func TestInstallAuthNotRequiredHandlers(t *testing.T) { - fw := newServerTestWithDebug(false, nil) + logger, _ := ktesting.NewTestContext(t) + fw := newServerTestWithDebug(logger, false, nil) defer fw.testHTTPServer.Close() // No new handlers should be added to this list. @@ -647,7 +654,8 @@ func TestAuthFilters(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, zpagesfeatures.ComponentStatusz, true) - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() attributesGetter := NewNodeAuthorizerAttributesGetter(authzTestNodeName) @@ -706,7 +714,8 @@ func TestAuthenticationError(t *testing.T) { calledAttributes = false ) - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) { calledAuthenticate = true @@ -744,7 +753,8 @@ func TestAuthenticationFailure(t *testing.T) { calledAttributes = false ) - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) { calledAuthenticate = true @@ -782,7 +792,8 @@ func TestAuthorizationSuccess(t *testing.T) { calledAttributes = false ) - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) { calledAuthenticate = true @@ -811,7 +822,8 @@ func TestAuthorizationSuccess(t *testing.T) { } func TestSyncLoopCheck(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() fw.fakeKubelet.hostnameFunc = func() string { return "127.0.0.1" @@ -884,7 +896,8 @@ func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodN } func TestContainerLogs(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() tests := map[string]struct { @@ -938,7 +951,8 @@ func TestContainerLogs(t *testing.T) { } func TestContainerLogsWithInvalidTail(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() output := "foo bar" podNamespace := "other" @@ -965,7 +979,8 @@ func TestContainerLogsWithSeparateStream(t *testing.T) { msg string } - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() var ( @@ -1180,7 +1195,8 @@ func TestCheckpointContainer(t *testing.T) { // Enable features.ContainerCheckpoint during test featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, featureGate) - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) // GetPodByName() should always fail fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) { return nil, false @@ -1273,10 +1289,11 @@ func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request { } func TestServeExecInContainerIdleTimeout(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ss, err := newTestStreamingServer(100 * time.Millisecond) require.NoError(t, err) defer ss.testHTTPServer.Close() - fw := newServerTestWithDebug(true, ss) + fw := newServerTestWithDebug(logger, true, ss) defer fw.testHTTPServer.Close() podNamespace := "other" @@ -1313,6 +1330,7 @@ func TestServeExecInContainerIdleTimeout(t *testing.T) { } func testExecAttach(t *testing.T, verb string) { + logger, _ := ktesting.NewTestContext(t) tests := map[string]struct { stdin bool stdout bool @@ -1336,7 +1354,7 @@ func testExecAttach(t *testing.T, verb string) { ss, err := newTestStreamingServer(0) require.NoError(t, err) defer ss.testHTTPServer.Close() - fw := newServerTestWithDebug(true, ss) + fw := newServerTestWithDebug(logger, true, ss) defer fw.testHTTPServer.Close() fmt.Println(desc) @@ -1532,10 +1550,11 @@ func TestServeAttachContainer(t *testing.T) { } func TestServePortForwardIdleTimeout(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) ss, err := newTestStreamingServer(100 * time.Millisecond) require.NoError(t, err) defer ss.testHTTPServer.Close() - fw := newServerTestWithDebug(true, ss) + fw := newServerTestWithDebug(logger, true, ss) defer fw.testHTTPServer.Close() podNamespace := "other" @@ -1569,6 +1588,7 @@ func TestServePortForwardIdleTimeout(t *testing.T) { } func TestServePortForward(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) tests := map[string]struct { port string uid bool @@ -1597,7 +1617,7 @@ func TestServePortForward(t *testing.T) { ss, err := newTestStreamingServer(0) require.NoError(t, err) defer ss.testHTTPServer.Close() - fw := newServerTestWithDebug(true, ss) + fw := newServerTestWithDebug(logger, true, ss) defer fw.testHTTPServer.Close() portForwardFuncDone := make(chan struct{}) @@ -1732,7 +1752,8 @@ func TestMetricBuckets(t *testing.T) { "invalid path": {url: "/junk", bucket: "other"}, "invalid path starting with good": {url: "/healthzjunk", bucket: "other"}, } - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() for _, test := range tests { @@ -1752,7 +1773,8 @@ func TestMetricMethodBuckets(t *testing.T) { "invalid method": {method: "WEIRD", bucket: "other"}, } - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() for _, test := range tests { @@ -1763,6 +1785,7 @@ func TestMetricMethodBuckets(t *testing.T) { } func TestDebuggingDisabledHandlers(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) // for backward compatibility even if enablesystemLogHandler or enableProfilingHandler is set but not // enableDebuggingHandler then /logs, /pprof and /flags shouldn't be served. kubeCfg := &kubeletconfiginternal.KubeletConfiguration{ @@ -1771,7 +1794,7 @@ func TestDebuggingDisabledHandlers(t *testing.T) { EnableDebugFlagsHandler: true, EnableProfilingHandler: true, } - fw := newServerTestWithDebuggingHandlers(kubeCfg, nil) + fw := newServerTestWithDebuggingHandlers(logger, kubeCfg, nil) defer fw.testHTTPServer.Close() paths := []string{ @@ -1786,10 +1809,11 @@ func TestDebuggingDisabledHandlers(t *testing.T) { } func TestDisablingLogAndProfilingHandler(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) kubeCfg := &kubeletconfiginternal.KubeletConfiguration{ EnableDebuggingHandlers: true, } - fw := newServerTestWithDebuggingHandlers(kubeCfg, nil) + fw := newServerTestWithDebuggingHandlers(logger, kubeCfg, nil) defer fw.testHTTPServer.Close() // verify debug endpoints are disabled @@ -1799,7 +1823,8 @@ func TestDisablingLogAndProfilingHandler(t *testing.T) { } func TestFailedParseParamsSummaryHandler(t *testing.T) { - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() resp, err := http.Post(fw.testHTTPServer.URL+"/stats/summary", "invalid/content/type", nil) @@ -1852,7 +1877,8 @@ func TestFineGrainedAuthz(t *testing.T) { // Enable features.ContainerCheckpoint during test featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KubeletFineGrainedAuthz, true) - fw := newServerTest() + logger, _ := ktesting.NewTestContext(t) + fw := newServerTest(logger) defer fw.testHTTPServer.Close() attributesGetter := NewNodeAuthorizerAttributesGetter(authzTestNodeName) @@ -1953,12 +1979,13 @@ func TestFineGrainedAuthz(t *testing.T) { } func TestNewServerRegistersMetricsSLIsEndpointTwice(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) host := &fakeKubelet{ hostnameFunc: func() string { return "127.0.0.1" }, } - resourceAnalyzer := stats.NewResourceAnalyzer(nil, time.Minute, &record.FakeRecorder{}) + resourceAnalyzer := stats.NewResourceAnalyzer(logger, nil, time.Minute, &record.FakeRecorder{}) server1 := NewServer(host, resourceAnalyzer, []healthz.HealthChecker{}, nil, nil) server2 := NewServer(host, resourceAnalyzer, []healthz.HealthChecker{}, nil, nil) diff --git a/pkg/kubelet/server/server_websocket_test.go b/pkg/kubelet/server/server_websocket_test.go index 75b6712a5403f..0a50793990e31 100644 --- a/pkg/kubelet/server/server_websocket_test.go +++ b/pkg/kubelet/server/server_websocket_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubelet/pkg/cri/streaming/portforward" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -38,6 +39,7 @@ const ( ) func TestServeWSPortForward(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) tests := map[string]struct { port string uid bool @@ -66,7 +68,7 @@ func TestServeWSPortForward(t *testing.T) { ss, err := newTestStreamingServer(0) require.NoError(t, err) defer ss.testHTTPServer.Close() - fw := newServerTestWithDebug(true, ss) + fw := newServerTestWithDebug(logger, true, ss) defer fw.testHTTPServer.Close() portForwardFuncDone := make(chan struct{}) @@ -150,6 +152,7 @@ func TestServeWSPortForward(t *testing.T) { } func TestServeWSMultiplePortForward(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) portsText := []string{"7000,8000", "9000"} ports := []uint16{7000, 8000, 9000} podNamespace := "other" @@ -158,7 +161,7 @@ func TestServeWSMultiplePortForward(t *testing.T) { ss, err := newTestStreamingServer(0) require.NoError(t, err) defer ss.testHTTPServer.Close() - fw := newServerTestWithDebug(true, ss) + fw := newServerTestWithDebug(logger, true, ss) defer fw.testHTTPServer.Close() portForwardWG := sync.WaitGroup{} diff --git a/pkg/kubelet/server/stats/resource_analyzer.go b/pkg/kubelet/server/stats/resource_analyzer.go index 853eeb67b0f47..9e04305f02f04 100644 --- a/pkg/kubelet/server/stats/resource_analyzer.go +++ b/pkg/kubelet/server/stats/resource_analyzer.go @@ -17,8 +17,10 @@ limitations under the License. package stats import ( - "k8s.io/client-go/tools/record" "time" + + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" ) // ResourceAnalyzer provides statistics on node resource consumption @@ -38,9 +40,9 @@ type resourceAnalyzer struct { var _ ResourceAnalyzer = &resourceAnalyzer{} // NewResourceAnalyzer returns a new ResourceAnalyzer -func NewResourceAnalyzer(statsProvider Provider, calVolumeFrequency time.Duration, eventRecorder record.EventRecorder) ResourceAnalyzer { +func NewResourceAnalyzer(logger klog.Logger, statsProvider Provider, calVolumeFrequency time.Duration, eventRecorder record.EventRecorder) ResourceAnalyzer { fsAnalyzer := newFsResourceAnalyzer(statsProvider, calVolumeFrequency, eventRecorder) - summaryProvider := NewSummaryProvider(statsProvider) + summaryProvider := NewSummaryProvider(logger, statsProvider) return &resourceAnalyzer{fsAnalyzer, summaryProvider} } diff --git a/pkg/kubelet/server/stats/summary.go b/pkg/kubelet/server/stats/summary.go index afc2f475a6587..4192672dfcec1 100644 --- a/pkg/kubelet/server/stats/summary.go +++ b/pkg/kubelet/server/stats/summary.go @@ -51,12 +51,12 @@ var _ SummaryProvider = &summaryProviderImpl{} // NewSummaryProvider returns a SummaryProvider using the stats provided by the // specified statsProvider. -func NewSummaryProvider(statsProvider Provider) SummaryProvider { +func NewSummaryProvider(logger klog.Logger, statsProvider Provider) SummaryProvider { kubeletCreationTime := metav1.Now() - bootTime, err := util.GetBootTime() + bootTime, err := util.GetBootTime(logger) if err != nil { // bootTime will be zero if we encounter an error getting the boot time. - klog.InfoS("Error getting system boot time. Node metrics will have an incorrect start time", "err", err) + logger.Info("Error getting system boot time. Node metrics will have an incorrect start time", "err", err) } return &summaryProviderImpl{ diff --git a/pkg/kubelet/server/stats/summary_test.go b/pkg/kubelet/server/stats/summary_test.go index 938a6354bb7f7..9b239abd486ff 100644 --- a/pkg/kubelet/server/stats/summary_test.go +++ b/pkg/kubelet/server/stats/summary_test.go @@ -29,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cm" statstest "k8s.io/kubernetes/pkg/kubelet/server/stats/testing" @@ -241,6 +242,7 @@ func TestSummaryProviderGetStatsSplitImageFs(t *testing.T) { func TestSummaryProviderGetCPUAndMemoryStats(t *testing.T) { ctx := context.Background() + logger := klog.FromContext(ctx) assert := assert.New(t) podStats := []statsapi.PodStats{ @@ -272,7 +274,7 @@ func TestSummaryProviderGetCPUAndMemoryStats(t *testing.T) { mockStatsProvider.EXPECT().GetCgroupCPUAndMemoryStats("/kubelet", false).Return(cgroupStatsMap["/kubelet"].cs, nil) mockStatsProvider.EXPECT().GetCgroupCPUAndMemoryStats("/kubepods", false).Return(cgroupStatsMap["/pods"].cs, nil) - provider := NewSummaryProvider(mockStatsProvider) + provider := NewSummaryProvider(logger, mockStatsProvider) summary, err := provider.GetCPUAndMemoryStats(ctx) assert.NoError(err) diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 323a2192aced3..05036d497dbe2 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -105,7 +105,7 @@ type PodDeletionSafetyProvider interface { } type PodStartupLatencyStateHelper interface { - RecordStatusUpdated(pod *v1.Pod) + RecordStatusUpdated(logger klog.Logger, pod *v1.Pod) DeletePodStartupState(podUID types.UID) } @@ -786,9 +786,13 @@ func (m *manager) syncBatch(all bool) int { // syncPod syncs the given status with the API server. The caller must not hold the status lock. func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { // TODO: make me easier to express from client code - pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(context.TODO(), status.podName, metav1.GetOptions{}) + // TODO: it needs to be replaced by a proper context in the future + ctx := context.TODO() + logger := klog.FromContext(ctx) + + pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(ctx, status.podName, metav1.GetOptions{}) if errors.IsNotFound(err) { - klog.V(3).InfoS("Pod does not exist on the server", + logger.V(3).Info("Pod does not exist on the server", "podUID", uid, "pod", klog.KRef(status.podNamespace, status.podName)) // If the Pod is deleted the status will be cleared in @@ -796,7 +800,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { return } if err != nil { - klog.InfoS("Failed to get status for pod", + logger.Info("Failed to get status for pod", "podUID", uid, "pod", klog.KRef(status.podNamespace, status.podName), "err", err) @@ -806,7 +810,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { translatedUID := m.podManager.TranslatePodUID(pod.UID) // Type convert original uid just for the purpose of comparison. if len(translatedUID) > 0 && translatedUID != kubetypes.ResolvedPodUID(uid) { - klog.V(2).InfoS("Pod was deleted and then recreated, skipping status update", + logger.V(2).Info("Pod was deleted and then recreated, skipping status update", "pod", klog.KObj(pod), "oldPodUID", uid, "podUID", translatedUID) @@ -816,25 +820,25 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod)) - newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) - klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes)) + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(ctx, m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) + logger.V(3).Info("Patch status for pod", "pod", klog.KObj(pod), "podUID", uid, "patch", string(patchBytes)) if err != nil { - klog.InfoS("Failed to update status for pod", "pod", klog.KObj(pod), "err", err) + logger.Info("Failed to update status for pod", "pod", klog.KObj(pod), "err", err) return } if unchanged { - klog.V(3).InfoS("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) + logger.V(3).Info("Status for pod is up-to-date", "pod", klog.KObj(pod), "statusVersion", status.version) } else { - klog.V(3).InfoS("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) + logger.V(3).Info("Status for pod updated successfully", "pod", klog.KObj(pod), "statusVersion", status.version, "status", mergedStatus) pod = newPod // We pass a new object (result of API call which contains updated ResourceVersion) - m.podStartupLatencyHelper.RecordStatusUpdated(pod) + m.podStartupLatencyHelper.RecordStatusUpdated(logger, pod) } // measure how long the status update took to propagate from generation to update on the server if status.at.IsZero() { - klog.V(3).InfoS("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) + logger.V(3).Info("Pod had no status time set", "pod", klog.KObj(pod), "podUID", uid, "version", status.version) } else { duration := time.Since(status.at).Truncate(time.Millisecond) metrics.PodStatusSyncDuration.Observe(duration.Seconds()) @@ -850,12 +854,12 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { // newly created pod with the same name and namespace. Preconditions: metav1.NewUIDPreconditions(string(pod.UID)), } - err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions) + err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, deleteOptions) if err != nil { - klog.InfoS("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err) + logger.Info("Failed to delete status for pod", "pod", klog.KObj(pod), "err", err) return } - klog.V(3).InfoS("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod)) + logger.V(3).Info("Pod fully terminated and removed from etcd", "pod", klog.KObj(pod)) m.deletePodStatus(uid) } } diff --git a/pkg/kubelet/util/boottime_util_darwin.go b/pkg/kubelet/util/boottime_util_darwin.go index b6a1cc38c813a..10e532da3f68e 100644 --- a/pkg/kubelet/util/boottime_util_darwin.go +++ b/pkg/kubelet/util/boottime_util_darwin.go @@ -26,10 +26,11 @@ import ( "unsafe" "golang.org/x/sys/unix" + "k8s.io/klog/v2" ) // GetBootTime returns the time at which the machine was started, truncated to the nearest second -func GetBootTime() (time.Time, error) { +func GetBootTime(logger klog.Logger) (time.Time, error) { output, err := unix.SysctlRaw("kern.boottime") if err != nil { return time.Time{}, err diff --git a/pkg/kubelet/util/boottime_util_darwin_test.go b/pkg/kubelet/util/boottime_util_darwin_test.go index 1138deeeb69e2..d4ff8a9d9954c 100644 --- a/pkg/kubelet/util/boottime_util_darwin_test.go +++ b/pkg/kubelet/util/boottime_util_darwin_test.go @@ -22,10 +22,13 @@ package util import ( "testing" "time" + + "k8s.io/kubernetes/test/utils/ktesting" ) func TestGetBootTime(t *testing.T) { - boottime, err := GetBootTime() + logger, _ := ktesting.NewTestContext(t) + boottime, err := GetBootTime(logger) if err != nil { t.Errorf("Unable to get system uptime") diff --git a/pkg/kubelet/util/boottime_util_freebsd.go b/pkg/kubelet/util/boottime_util_freebsd.go index 7d605d7611c1d..18308075c9e5e 100644 --- a/pkg/kubelet/util/boottime_util_freebsd.go +++ b/pkg/kubelet/util/boottime_util_freebsd.go @@ -23,12 +23,14 @@ import ( "fmt" "time" - "golang.org/x/sys/unix" "unsafe" + + "golang.org/x/sys/unix" + "k8s.io/klog/v2" ) // GetBootTime returns the time at which the machine was started, truncated to the nearest second -func GetBootTime() (time.Time, error) { +func GetBootTime(logger klog.Logger) (time.Time, error) { currentTime := time.Now() ts := &unix.Timeval{} _, _, e1 := unix.Syscall(uintptr(unix.SYS_CLOCK_GETTIME), uintptr(unix.CLOCK_UPTIME), uintptr(unsafe.Pointer(ts)), 0) diff --git a/pkg/kubelet/util/boottime_util_freebsd_test.go b/pkg/kubelet/util/boottime_util_freebsd_test.go index b79eda6cb6a40..25582e6a7ca1e 100644 --- a/pkg/kubelet/util/boottime_util_freebsd_test.go +++ b/pkg/kubelet/util/boottime_util_freebsd_test.go @@ -22,10 +22,13 @@ package util import ( "testing" "time" + + "k8s.io/kubernetes/test/utils/ktesting" ) func TestGetBootTime(t *testing.T) { - boottime, err := GetBootTime() + logger, _ := ktesting.NewTestContext(t) + boottime, err := GetBootTime(logger) if err != nil { t.Errorf("Unable to get system uptime") diff --git a/pkg/kubelet/util/boottime_util_linux.go b/pkg/kubelet/util/boottime_util_linux.go index 935b3d80341b6..4e1fc1e218587 100644 --- a/pkg/kubelet/util/boottime_util_linux.go +++ b/pkg/kubelet/util/boottime_util_linux.go @@ -33,10 +33,10 @@ import ( // GetBootTime returns the time at which the machine was started, truncated to the nearest second. // It uses /proc/stat first, which is more accurate, and falls back to the less accurate // unix.Sysinfo if /proc/stat failed. -func GetBootTime() (time.Time, error) { +func GetBootTime(logger klog.Logger) (time.Time, error) { bootTime, err := getBootTimeWithProcStat() if err != nil { - klog.InfoS("Failed to get boot time from /proc/uptime. Will retry with unix.Sysinfo.", "error", err) + logger.Info("Failed to get boot time from /proc/uptime. Will retry with unix.Sysinfo.", "error", err) return getBootTimeWithSysinfo() } return bootTime, nil diff --git a/pkg/kubelet/util/boottime_util_linux_test.go b/pkg/kubelet/util/boottime_util_linux_test.go index 9f77dfd0287ba..db384b48ae4f0 100644 --- a/pkg/kubelet/util/boottime_util_linux_test.go +++ b/pkg/kubelet/util/boottime_util_linux_test.go @@ -22,10 +22,13 @@ package util import ( "testing" "time" + + "k8s.io/kubernetes/test/utils/ktesting" ) func TestGetBootTime(t *testing.T) { - boottime, err := GetBootTime() + logger, _ := ktesting.NewTestContext(t) + boottime, err := GetBootTime(logger) if err != nil { t.Errorf("Unable to get system uptime") diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go index cbc42fa6bf1dc..13ab0319278a2 100644 --- a/pkg/kubelet/util/manager/watch_based_manager.go +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -17,6 +17,7 @@ limitations under the License. package manager import ( + "context" "fmt" "sync" "time" @@ -343,7 +344,8 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { if c.isImmutable(object) { item.setImmutable() if item.stop() { - klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name)) + // TODO: it needs to be replaced by a proper context in the future + klog.FromContext(context.TODO()).V(4).Info("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name)) } } return object, nil @@ -352,12 +354,15 @@ func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { } func (c *objectCache) startRecycleIdleWatch() { + // TODO: it needs to be replaced by a proper context in the future + ctx := context.TODO() + logger := klog.FromContext(ctx) c.lock.Lock() defer c.lock.Unlock() for key, item := range c.items { if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) { - klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime) + logger.V(4).Info("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime) } } } diff --git a/pkg/kubelet/util/node_startup_latency_tracker.go b/pkg/kubelet/util/node_startup_latency_tracker.go index 815e4e81eaf91..81c947cc7b7e0 100644 --- a/pkg/kubelet/util/node_startup_latency_tracker.go +++ b/pkg/kubelet/util/node_startup_latency_tracker.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/utils/clock" ) @@ -46,8 +47,8 @@ type basicNodeStartupLatencyTracker struct { clock clock.Clock } -func NewNodeStartupLatencyTracker() NodeStartupLatencyTracker { - bootTime, err := GetBootTime() +func NewNodeStartupLatencyTracker(logger klog.Logger) NodeStartupLatencyTracker { + bootTime, err := GetBootTime(logger) if err != nil { bootTime = time.Time{} } diff --git a/pkg/kubelet/util/nodelease.go b/pkg/kubelet/util/nodelease.go index 9e1b00b76242e..dc1fa6fc3e670 100644 --- a/pkg/kubelet/util/nodelease.go +++ b/pkg/kubelet/util/nodelease.go @@ -29,14 +29,14 @@ import ( // SetNodeOwnerFunc helps construct a newLeasePostProcessFunc which sets // a node OwnerReference to the given lease object -func SetNodeOwnerFunc(c clientset.Interface, nodeName string) func(lease *coordinationv1.Lease) error { +func SetNodeOwnerFunc(ctx context.Context, c clientset.Interface, nodeName string) func(lease *coordinationv1.Lease) error { return func(lease *coordinationv1.Lease) error { // Setting owner reference needs node's UID. Note that it is different from // kubelet.nodeRef.UID. When lease is initially created, it is possible that // the connection between master and node is not ready yet. So try to set // owner reference every time when renewing the lease, until successful. if len(lease.OwnerReferences) == 0 { - if node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}); err == nil { + if node, err := c.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}); err == nil { lease.OwnerReferences = []metav1.OwnerReference{ { APIVersion: corev1.SchemeGroupVersion.WithKind("Node").Version, @@ -46,7 +46,8 @@ func SetNodeOwnerFunc(c clientset.Interface, nodeName string) func(lease *coordi }, } } else { - klog.ErrorS(err, "Failed to get node when trying to set owner ref to the node lease", "node", klog.KRef("", nodeName)) + logger := klog.FromContext(ctx) + logger.Error(err, "Failed to get node when trying to set owner ref to the node lease", "node", klog.KRef("", nodeName)) return err } } diff --git a/pkg/kubelet/util/pod_startup_latency_tracker.go b/pkg/kubelet/util/pod_startup_latency_tracker.go index 3ab3d2aaae537..c36784ff143ee 100644 --- a/pkg/kubelet/util/pod_startup_latency_tracker.go +++ b/pkg/kubelet/util/pod_startup_latency_tracker.go @@ -30,10 +30,10 @@ import ( // PodStartupLatencyTracker records key moments for startup latency calculation, // e.g. image pulling or pod observed running on watch. type PodStartupLatencyTracker interface { - ObservedPodOnWatch(pod *v1.Pod, when time.Time) + ObservedPodOnWatch(logger klog.Logger, pod *v1.Pod, when time.Time) RecordImageStartedPulling(podUID types.UID) RecordImageFinishedPulling(podUID types.UID) - RecordStatusUpdated(pod *v1.Pod) + RecordStatusUpdated(logger klog.Logger, pod *v1.Pod) DeletePodStartupState(podUID types.UID) } @@ -64,7 +64,7 @@ func NewPodStartupLatencyTracker() PodStartupLatencyTracker { } } -func (p *basicPodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when time.Time) { +func (p *basicPodStartupLatencyTracker) ObservedPodOnWatch(logger klog.Logger, pod *v1.Pod, when time.Time) { p.lock.Lock() defer p.lock.Unlock() @@ -101,7 +101,7 @@ func (p *basicPodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when tim imagePullingDuration := state.lastFinishedPulling.Sub(state.firstStartedPulling) podStartSLOduration := (podStartingDuration - imagePullingDuration).Seconds() - klog.InfoS("Observed pod startup duration", + logger.Info("Observed pod startup duration", "pod", klog.KObj(pod), "podStartSLOduration", podStartSLOduration, "podStartE2EDuration", podStartingDuration, @@ -149,7 +149,7 @@ func (p *basicPodStartupLatencyTracker) RecordImageFinishedPulling(podUID types. state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past. } -func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) { +func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(logger klog.Logger, pod *v1.Pod) { p.lock.Lock() defer p.lock.Unlock() @@ -169,7 +169,7 @@ func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) { } if hasPodStartedSLO(pod) { - klog.V(3).InfoS("Mark when the pod was running for the first time", "pod", klog.KObj(pod), "rv", pod.ResourceVersion) + logger.V(3).Info("Mark when the pod was running for the first time", "pod", klog.KObj(pod), "rv", pod.ResourceVersion) state.observedRunningTime = p.clock.Now() } } diff --git a/pkg/kubelet/util/pod_startup_latency_tracker_test.go b/pkg/kubelet/util/pod_startup_latency_tracker_test.go index 5679f0568ddc3..084491ad166c3 100644 --- a/pkg/kubelet/util/pod_startup_latency_tracker_test.go +++ b/pkg/kubelet/util/pod_startup_latency_tracker_test.go @@ -25,9 +25,11 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/test/utils/ktesting" testingclock "k8s.io/utils/clock/testing" ) @@ -60,6 +62,7 @@ func TestNoEvents(t *testing.T) { } func TestPodsRunningBeforeKubeletStarted(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) t.Run("pod was running for 10m before kubelet started", func(t *testing.T) { // expects no metrics in the output @@ -80,7 +83,7 @@ func TestPodsRunningBeforeKubeletStarted(t *testing.T) { StartTime: &metav1.Time{Time: frozenTime.Add(-10 * time.Minute)}, }, } - tracker.ObservedPodOnWatch(podStarted, frozenTime) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime) assert.Empty(t, tracker.pods) metrics.PodStartSLIDuration.Reset() @@ -88,6 +91,8 @@ func TestPodsRunningBeforeKubeletStarted(t *testing.T) { } func TestSinglePodOneImageDownloadRecorded(t *testing.T) { + tCtx := ktesting.Init(t) + logger := klog.FromContext(tCtx) t.Run("single pod; started in 3s, image pulling 100ms", func(t *testing.T) { @@ -134,7 +139,7 @@ kubelet_pod_start_sli_duration_seconds_count 1 } podInit := buildInitializingPod() - tracker.ObservedPodOnWatch(podInit, frozenTime) + tracker.ObservedPodOnWatch(logger, podInit, frozenTime) // image pulling took 100ms tracker.RecordImageStartedPulling(podInit.UID) @@ -151,10 +156,10 @@ kubelet_pod_start_sli_duration_seconds_count 1 } podStarted := buildRunningPod() - tracker.RecordStatusUpdated(podStarted) + tracker.RecordStatusUpdated(logger, podStarted) // 3s later, observe the same pod on watch - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*3)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*3)) if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil { t.Fatal(err) @@ -169,6 +174,8 @@ kubelet_pod_start_sli_duration_seconds_count 1 } func TestSinglePodMultipleDownloadsAndRestartsRecorded(t *testing.T) { + tCtx := ktesting.Init(t) + logger := klog.FromContext(tCtx) t.Run("single pod; started in 30s, image pulling between 10th and 20th seconds", func(t *testing.T) { @@ -215,7 +222,7 @@ kubelet_pod_start_sli_duration_seconds_count 1 } podInitializing := buildInitializingPod() - tracker.ObservedPodOnWatch(podInitializing, frozenTime) + tracker.ObservedPodOnWatch(logger, podInitializing, frozenTime) // image pulling started at 10s and the last one finished at 30s // first image starts pulling at 10s @@ -249,19 +256,19 @@ kubelet_pod_start_sli_duration_seconds_count 1 // pod started podStarted := buildRunningPod() - tracker.RecordStatusUpdated(podStarted) + tracker.RecordStatusUpdated(logger, podStarted) // at 30s observe the same pod on watch - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*30)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*30)) if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil { t.Fatal(err) } // any new pod observations should not impact the metrics, as the pod should be recorder only once - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*150)) - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*200)) - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*250)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*150)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*200)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*250)) if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), metricsName); err != nil { t.Fatal(err) @@ -276,6 +283,8 @@ kubelet_pod_start_sli_duration_seconds_count 1 } func TestFirstNetworkPodMetrics(t *testing.T) { + tCtx := ktesting.Init(t) + logger := klog.FromContext(tCtx) t.Run("first network pod; started in 30s, image pulling between 10th and 20th seconds", func(t *testing.T) { @@ -298,32 +307,32 @@ kubelet_first_network_pod_start_sli_duration_seconds 30 hostNetworkPodInitializing := buildInitializingPod() hostNetworkPodInitializing.UID = "11111-22222" hostNetworkPodInitializing.Spec.HostNetwork = true - tracker.ObservedPodOnWatch(hostNetworkPodInitializing, frozenTime) + tracker.ObservedPodOnWatch(logger, hostNetworkPodInitializing, frozenTime) hostNetworkPodStarted := buildRunningPod() hostNetworkPodStarted.UID = "11111-22222" hostNetworkPodStarted.Spec.HostNetwork = true - tracker.RecordStatusUpdated(hostNetworkPodStarted) + tracker.RecordStatusUpdated(logger, hostNetworkPodStarted) // track only the first pod with network podInitializing := buildInitializingPod() - tracker.ObservedPodOnWatch(podInitializing, frozenTime) + tracker.ObservedPodOnWatch(logger, podInitializing, frozenTime) // pod started podStarted := buildRunningPod() - tracker.RecordStatusUpdated(podStarted) + tracker.RecordStatusUpdated(logger, podStarted) // at 30s observe the same pod on watch - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*30)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*30)) if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), "kubelet_first_network_pod_start_sli_duration_seconds", "kubelet_first_network_pod_start_total_duration_seconds"); err != nil { t.Fatal(err) } // any new pod observations should not impact the metrics, as the pod should be recorder only once - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*150)) - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*200)) - tracker.ObservedPodOnWatch(podStarted, frozenTime.Add(time.Second*250)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*150)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*200)) + tracker.ObservedPodOnWatch(logger, podStarted, frozenTime.Add(time.Second*250)) if err := testutil.GatherAndCompare(metrics.GetGather(), strings.NewReader(wants), "kubelet_first_network_pod_start_sli_duration_seconds", "kubelet_first_network_pod_start_total_duration_seconds"); err != nil { t.Fatal(err) diff --git a/pkg/kubelet/util/swap/swap_util.go b/pkg/kubelet/util/swap/swap_util.go index 20f0e45ef5ff4..1502df619cbb7 100644 --- a/pkg/kubelet/util/swap/swap_util.go +++ b/pkg/kubelet/util/swap/swap_util.go @@ -41,8 +41,9 @@ var ( const TmpfsNoswapOption = "noswap" -func IsTmpfsNoswapOptionSupported(mounter mount.Interface, mountPath string) bool { - isTmpfsNoswapOptionSupportedHelper := func() bool { +func IsTmpfsNoswapOptionSupported(logger klog.Logger, mounter mount.Interface, mountPath string) bool { + // TODO: it needs to be replaced by a proper context in the future + isTmpfsNoswapOptionSupportedHelper := func(logger klog.Logger) bool { if sysruntime.GOOS == "windows" { return false } @@ -51,13 +52,13 @@ func IsTmpfsNoswapOptionSupported(mounter mount.Interface, mountPath string) boo // Turning off swap in unprivileged tmpfs mounts unsupported // https://github.com/torvalds/linux/blob/v6.8/mm/shmem.c#L4004-L4011 // https://github.com/kubernetes/kubernetes/issues/125137 - klog.InfoS("Running under a user namespace - tmpfs noswap is not supported") + logger.Info("Running under a user namespace - tmpfs noswap is not supported") return false } kernelVersion, err := utilkernel.GetVersion() if err != nil { - klog.ErrorS(err, "cannot determine kernel version, unable to determine is tmpfs noswap is supported") + logger.Error(err, "cannot determine kernel version, unable to determine is tmpfs noswap is supported") return false } @@ -66,45 +67,45 @@ func IsTmpfsNoswapOptionSupported(mounter mount.Interface, mountPath string) boo } if mountPath == "" { - klog.ErrorS(errors.New("mount path is empty, falling back to /tmp"), "") + logger.Error(errors.New("mount path is empty, falling back to /tmp"), "") } mountPath, err = os.MkdirTemp(mountPath, "tmpfs-noswap-test-") if err != nil { - klog.InfoS("error creating dir to test if tmpfs noswap is enabled. Assuming not supported", "mount path", mountPath, "error", err) + logger.Info("error creating dir to test if tmpfs noswap is enabled. Assuming not supported", "mount path", mountPath, "error", err) return false } defer func() { err = os.RemoveAll(mountPath) if err != nil { - klog.ErrorS(err, "error removing test tmpfs dir", "mount path", mountPath) + logger.Error(err, "error removing test tmpfs dir", "mount path", mountPath) } }() err = mounter.MountSensitiveWithoutSystemd("tmpfs", mountPath, "tmpfs", []string{TmpfsNoswapOption}, nil) if err != nil { - klog.InfoS("error mounting tmpfs with the noswap option. Assuming not supported", "error", err) + logger.Info("error mounting tmpfs with the noswap option. Assuming not supported", "error", err) return false } err = mounter.Unmount(mountPath) if err != nil { - klog.ErrorS(err, "error unmounting test tmpfs dir", "mount path", mountPath) + logger.Error(err, "error unmounting test tmpfs dir", "mount path", mountPath) } return true } tmpfsNoswapOptionAvailabilityOnce.Do(func() { - tmpfsNoswapOptionSupported = isTmpfsNoswapOptionSupportedHelper() + tmpfsNoswapOptionSupported = isTmpfsNoswapOptionSupportedHelper(logger) }) return tmpfsNoswapOptionSupported } // gets /proc/swaps's content as an input, returns true if swap is enabled. -func isSwapOnAccordingToProcSwaps(procSwapsContent []byte) bool { +func isSwapOnAccordingToProcSwaps(logger klog.Logger, procSwapsContent []byte) bool { procSwapsContent = bytes.TrimSpace(procSwapsContent) // extra trailing \n procSwapsStr := string(procSwapsContent) procSwapsLines := strings.Split(procSwapsStr, "\n") @@ -112,7 +113,7 @@ func isSwapOnAccordingToProcSwaps(procSwapsContent []byte) bool { // If there is more than one line (table headers) in /proc/swaps then swap is enabled isSwapOn := len(procSwapsLines) > 1 if isSwapOn { - klog.InfoS("Swap is on", "/proc/swaps contents", procSwapsStr) + logger.Info("Swap is on", "/proc/swaps contents", procSwapsStr) } return isSwapOn @@ -121,7 +122,8 @@ func isSwapOnAccordingToProcSwaps(procSwapsContent []byte) bool { // IsSwapOn detects whether swap in enabled on the system by inspecting // /proc/swaps. If the file does not exist, an os.NotFound error will be returned. // If running on windows, swap is assumed to always be false. -func IsSwapOn() (bool, error) { +func IsSwapOn(logger klog.Logger) (bool, error) { + // TODO: it needs to be replaced by a proper context in the future isSwapOnHelper := func() (bool, error) { if sysruntime.GOOS == "windows" { return false, nil @@ -131,14 +133,14 @@ func IsSwapOn() (bool, error) { procSwapsContent, err := os.ReadFile(swapFilePath) if err != nil { if os.IsNotExist(err) { - klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFilePath) + logger.Info("File does not exist, assuming that swap is disabled", "path", swapFilePath) return false, nil } return false, err } - return isSwapOnAccordingToProcSwaps(procSwapsContent), nil + return isSwapOnAccordingToProcSwaps(logger, procSwapsContent), nil } swapOnOnce.Do(func() { diff --git a/pkg/kubelet/util/swap/swap_util_test.go b/pkg/kubelet/util/swap/swap_util_test.go index 611c464b70fdd..1fe5cbb210e5a 100644 --- a/pkg/kubelet/util/swap/swap_util_test.go +++ b/pkg/kubelet/util/swap/swap_util_test.go @@ -16,7 +16,12 @@ limitations under the License. package swap -import "testing" +import ( + "testing" + + "k8s.io/klog/v2" + "k8s.io/kubernetes/test/utils/ktesting" +) func TestIsSwapEnabled(t *testing.T) { testCases := []struct { @@ -54,10 +59,12 @@ Filename Type Size Used Priority expectedEnabled: false, }, } + tCtx := ktesting.Init(t) + logger := klog.FromContext(tCtx) for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - isEnabled := isSwapOnAccordingToProcSwaps([]byte(tc.procSwapsContent)) + isEnabled := isSwapOnAccordingToProcSwaps(logger, []byte(tc.procSwapsContent)) if isEnabled != tc.expectedEnabled { t.Errorf("expected %v, got %v", tc.expectedEnabled, isEnabled) } diff --git a/pkg/kubelet/util/util_unsupported.go b/pkg/kubelet/util/util_unsupported.go index e95a7c583a899..9a02093756d66 100644 --- a/pkg/kubelet/util/util_unsupported.go +++ b/pkg/kubelet/util/util_unsupported.go @@ -22,6 +22,8 @@ package util import ( "fmt" "time" + + "k8s.io/klog/v2" ) // LockAndCheckSubPath empty implementation @@ -39,6 +41,6 @@ func LocalEndpoint(path, file string) (string, error) { } // GetBootTime empty implementation -func GetBootTime() (time.Time, error) { +func GetBootTime(logger klog.Logger) (time.Time, error) { return time.Time{}, fmt.Errorf("GetBootTime is unsupported in this build") } diff --git a/pkg/kubelet/util/util_windows.go b/pkg/kubelet/util/util_windows.go index c944a7d22f273..f1baf15320762 100644 --- a/pkg/kubelet/util/util_windows.go +++ b/pkg/kubelet/util/util_windows.go @@ -25,6 +25,8 @@ import ( "strings" "syscall" "time" + + "k8s.io/klog/v2" ) const npipeProtocol = "npipe" @@ -55,7 +57,7 @@ func LocalEndpoint(path, file string) (string, error) { var tickCount = syscall.NewLazyDLL("kernel32.dll").NewProc("GetTickCount64") // GetBootTime returns the time at which the machine was started, truncated to the nearest second -func GetBootTime() (time.Time, error) { +func GetBootTime(logger klog.Logger) (time.Time, error) { currentTime := time.Now() output, _, err := tickCount.Call() if errno, ok := err.(syscall.Errno); !ok || errno != 0 { diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index 523b6fcd65f42..137a73650b4e8 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -93,6 +93,9 @@ func NewHollowKubelet( imageService internalapi.ImageManagerService, runtimeService internalapi.RuntimeService, containerManager cm.ContainerManager) *HollowKubelet { + // TODO: it needs to be replaced by a proper context in the future + ctx := context.TODO() + logger := klog.FromContext(ctx) d := &kubelet.Dependencies{ KubeClient: client, HeartbeatClient: heartbeatClient, @@ -110,7 +113,7 @@ func NewHollowKubelet( Subpather: &subpath.FakeSubpath{}, HostUtil: hostutil.NewFakeHostUtil(nil), PodStartupLatencyTracker: kubeletutil.NewPodStartupLatencyTracker(), - NodeStartupLatencyTracker: kubeletutil.NewNodeStartupLatencyTracker(), + NodeStartupLatencyTracker: kubeletutil.NewNodeStartupLatencyTracker(logger), TracerProvider: noopoteltrace.NewTracerProvider(), Recorder: &record.FakeRecorder{}, // With real recorder we attempt to read /dev/kmsg. } diff --git a/pkg/volume/emptydir/empty_dir.go b/pkg/volume/emptydir/empty_dir.go index a9128f661faad..4bb7bd2b3be3e 100644 --- a/pkg/volume/emptydir/empty_dir.go +++ b/pkg/volume/emptydir/empty_dir.go @@ -17,6 +17,7 @@ limitations under the License. package emptydir import ( + "context" "fmt" "os" "path/filepath" @@ -248,6 +249,8 @@ func (ed *emptyDir) SetUpAt(dir string, mounterArgs volume.MounterArgs) error { // storage medium is the default, then the volume is ready. If the // medium is memory, and a mountpoint is present, then the volume is // ready. + ctx := context.TODO() + logger := klog.FromContext(ctx) readyDir := ed.getMetaDir() if volumeutil.IsReady(readyDir) { if ed.medium == v1.StorageMediumMemory && !notMnt { @@ -272,7 +275,7 @@ func (ed *emptyDir) SetUpAt(dir string, mounterArgs volume.MounterArgs) error { case ed.medium == v1.StorageMediumDefault: err = ed.setupDir(dir) case ed.medium == v1.StorageMediumMemory: - err = ed.setupTmpfs(dir) + err = ed.setupTmpfs(logger, dir) case v1helper.IsHugePageMedium(ed.medium): err = ed.setupHugepages(dir) default: @@ -320,7 +323,7 @@ func (ed *emptyDir) assignQuota(dir string, mounterSize *resource.Quantity) erro } // setupTmpfs creates a tmpfs mount at the specified directory. -func (ed *emptyDir) setupTmpfs(dir string) error { +func (ed *emptyDir) setupTmpfs(logger klog.Logger, dir string) error { if ed.mounter == nil { return fmt.Errorf("memory storage requested, but mounter is nil") } @@ -338,7 +341,7 @@ func (ed *emptyDir) setupTmpfs(dir string) error { return nil } - options := ed.generateTmpfsMountOptions(swap.IsTmpfsNoswapOptionSupported(ed.mounter, ed.plugin.host.GetPluginDir(emptyDirPluginName))) + options := ed.generateTmpfsMountOptions(swap.IsTmpfsNoswapOptionSupported(logger, ed.mounter, ed.plugin.host.GetPluginDir(emptyDirPluginName))) klog.V(3).Infof("pod %v: mounting tmpfs for volume %v", ed.pod.UID, ed.volName) return ed.mounter.MountSensitiveWithoutSystemd("tmpfs", dir, "tmpfs", options, nil) diff --git a/test/e2e_node/node_problem_detector_linux.go b/test/e2e_node/node_problem_detector_linux.go index a7c9e8ca7bb4c..b6f17604dacc1 100644 --- a/test/e2e_node/node_problem_detector_linux.go +++ b/test/e2e_node/node_problem_detector_linux.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" clientset "k8s.io/client-go/kubernetes" coreclientset "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/klog/v2" admissionapi "k8s.io/pod-security-admission/api" "k8s.io/kubernetes/pkg/kubelet/util" @@ -105,10 +106,11 @@ var _ = SIGDescribe("NodeProblemDetector", feature.NodeProblemDetector, framewor ginkgo.BeforeEach(func(ctx context.Context) { ginkgo.By("Calculate Lookback duration") + logger := klog.FromContext(ctx) var err error nodeTime = time.Now() - bootTime, err = util.GetBootTime() + bootTime, err = util.GetBootTime(logger) framework.ExpectNoError(err) // Set lookback duration longer than node up time.