Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

chore(kubelet): migrate utils to contextual logging #130480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion 3 cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ func getReservedCPUs(machineInfo *cadvisorapi.MachineInfo, cpus string) (cpuset.

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
logger := klog.FromContext(ctx)
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
Expand Down Expand Up @@ -894,7 +895,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}

if kubeDeps.NodeStartupLatencyTracker == nil {
kubeDeps.NodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
kubeDeps.NodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker(logger)
}

// TODO(vmarmol): Do this through container config.
Expand Down
1 change: 1 addition & 0 deletions 1 hack/golangci-hints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*

# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
1 change: 1 addition & 0 deletions 1 hack/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*

# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
1 change: 1 addition & 0 deletions 1 hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ contextual k8s.io/kubernetes/pkg/kubelet/token/.*
contextual k8s.io/kubernetes/pkg/kubelet/cadvisor/.*
contextual k8s.io/kubernetes/pkg/kubelet/oom/.*
contextual k8s.io/kubernetes/pkg/kubelet/sysctl/.*
contextual k8s.io/kubernetes/pkg/kubelet/util/.*

# As long as contextual logging is alpha or beta, all WithName, WithValues,
# NewContext calls have to go through klog. Once it is GA, we can lift
Expand Down
7 changes: 5 additions & 2 deletions 7 pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
}

isSwapOn, err := swap.IsSwapOn()
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
logger := klog.FromContext(ctx)
isSwapOn, err := swap.IsSwapOn(logger)
if err != nil {
return nil, fmt.Errorf("failed to determine if swap is on: %w", err)
}
Expand All @@ -219,7 +222,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, fmt.Errorf("running with swap on is not supported, please disable swap or set --fail-swap-on flag to false")
}

if !swap.IsTmpfsNoswapOptionSupported(mountUtil, nodeConfig.KubeletRootDir) {
if !swap.IsTmpfsNoswapOptionSupported(logger, mountUtil, nodeConfig.KubeletRootDir) {
nodeRef := nodeRefFromNode(string(nodeConfig.NodeName))
recorder.Event(nodeRef, v1.EventTypeWarning, events.PossibleMemoryBackedVolumesOnDisk,
"The tmpfs noswap option is not supported. Memory-backed volumes (e.g. secrets, emptyDirs, etc.) "+
Expand Down
10 changes: 5 additions & 5 deletions 10 pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
)

type podStartupSLIObserver interface {
ObservedPodOnWatch(pod *v1.Pod, when time.Time)
ObservedPodOnWatch(logger klog.Logger, pod *v1.Pod, when time.Time)
}

// PodConfig is a configuration mux that merges many sources of pod configuration into a single
Expand Down Expand Up @@ -157,12 +157,12 @@ func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificatio
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel. Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
func (s *podStorage) Merge(logger klog.Logger, source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock()

seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
adds, updates, deletes, removes, reconciles := s.merge(logger, source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)

// deliver update notifications
Expand Down Expand Up @@ -216,7 +216,7 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil
}

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
func (s *podStorage) merge(logger klog.Logger, source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()

Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
// ignore static pods
if !kubetypes.IsStaticPod(ref) {
s.startupSLIObserver.ObservedPodOnWatch(ref, time.Now())
s.startupSLIObserver.ObservedPodOnWatch(logger, ref, time.Now())
}
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
Expand Down
4 changes: 3 additions & 1 deletion 4 pkg/kubelet/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/test/utils/ktesting"
Expand Down Expand Up @@ -64,7 +65,8 @@ func (s sortedPods) Less(i, j int) bool {

type mockPodStartupSLIObserver struct{}

func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(pod *v1.Pod, when time.Time) {}
func (m *mockPodStartupSLIObserver) ObservedPodOnWatch(logger klog.Logger, pod *v1.Pod, when time.Time) {
}

func CreateValidPod(name, namespace string) *v1.Pod {
return &v1.Pod{
Expand Down
12 changes: 7 additions & 5 deletions 12 pkg/kubelet/config/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
Merge(logger klog.Logger, source string, update interface{}) error
}

// mux is a class for merging configuration from multiple sources. Changes are
Expand Down Expand Up @@ -70,14 +70,16 @@ func (m *mux) ChannelWithContext(ctx context.Context, source string) chan interf
newChannel := make(chan interface{})
m.sources[source] = newChannel

go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
logger := klog.FromContext(ctx)

go wait.Until(func() { m.listen(logger, source, newChannel) }, 0, ctx.Done())
return newChannel
}

func (m *mux) listen(source string, listenChannel <-chan interface{}) {
func (m *mux) listen(logger klog.Logger, source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
if err := m.merger.Merge(source, update); err != nil {
klog.InfoS("failed merging update", "err", err)
if err := m.merger.Merge(logger, source, update); err != nil {
logger.Info("failed merging update", "err", err)
}
}
}
12 changes: 7 additions & 5 deletions 12 pkg/kubelet/config/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"reflect"
"testing"

"k8s.io/klog/v2"
)

func TestConfigurationChannels(t *testing.T) {
Expand All @@ -43,7 +45,7 @@ type MergeMock struct {
t *testing.T
}

func (m MergeMock) Merge(source string, update interface{}) error {
func (m MergeMock) Merge(logger klog.Logger, source string, update interface{}) error {
if m.source != source {
m.t.Errorf("Expected %s, Got %s", m.source, source)
}
Expand All @@ -63,18 +65,18 @@ func TestMergeInvoked(t *testing.T) {
}

// mergeFunc implements the Merger interface
type mergeFunc func(source string, update interface{}) error
type mergeFunc func(logger klog.Logger, source string, update interface{}) error

func (f mergeFunc) Merge(source string, update interface{}) error {
return f(source, update)
func (f mergeFunc) Merge(logger klog.Logger, source string, update interface{}) error {
return f(logger, source, update)
}

func TestSimultaneousMerge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan bool, 2)
mux := newMux(mergeFunc(func(source string, update interface{}) error {
mux := newMux(mergeFunc(func(logger klog.Logger, source string, update interface{}) error {
switch source {
case "one":
if update.(string) != "test" {
Expand Down
4 changes: 2 additions & 2 deletions 4 pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
klet.allocationManager = allocation.NewManager(klet.getRootDir())

klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(logger, klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)

klet.runtimeService = kubeDeps.RemoteRuntimeService

Expand Down Expand Up @@ -991,7 +991,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
renewInterval,
string(klet.nodeName),
v1.NamespaceNodeLease,
util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
util.SetNodeOwnerFunc(ctx, klet.heartbeatClient, string(klet.nodeName)))

// setup node shutdown manager
shutdownManager := nodeshutdown.NewManager(&nodeshutdown.Config{
Expand Down
4 changes: 2 additions & 2 deletions 4 pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func newTestKubeletWithImageList(
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker)
kubelet.allocationManager = allocation.NewInMemoryManager()
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker(logger)

kubelet.containerRuntime = fakeRuntime
kubelet.runtimeCache = containertest.NewFakeRuntimeCache(kubelet.containerRuntime)
Expand All @@ -302,7 +302,7 @@ func newTestKubeletWithImageList(
}

volumeStatsAggPeriod := time.Second * 10
kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.recorder)
kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(logger, kubelet, volumeStatsAggPeriod, kubelet.recorder)

fakeHostStatsProvider := stats.NewFakeHostStatsProvider()

Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.