zhangguanzhang's Blog

kube-log-runner 使用和适配 logrotate 改造

字数统计: 1.5k阅读时长: 7 min
2025/04/21

最近使用 kube-log-runner 的经历…..

由来

kubelet 和一些 kube 组件在二进制 systemd service 管理下,日志最终会在 /var/log/messages 里,我们的客户对该文件会有关键字(Error、Failed …)监控,让我们把相关组件日志写到其他文件里去。

经过

选型

根据官方文档 系统日志 得知 v1.26 开始移除了以前的日志文件、目录和轮转之类的参数,而是让使用 kube-log-runner 代替,该二进制已经内置在二进制下载压缩包里了。如果是使用镜像,官方的容器镜像内置了,只是名字叫做 /go-runner

根据 github kube-log-runner 得知使用方式和源码,它就是 golang 写的一个简单工具,启动命令,把命令的标准输出和错误输出捕获写到文件,然后转发信号给进程。

尝试

1
2
3
4
5
$ systemctl cat --no-pager kube-apiserver
...
ExecStart=/usr/local/bin/kube-log-runner \
--log-file=/var/logs/kube-apiserver.log \
/usr/local/bin/kube-apiserver

直接测试了下发现启动报错下面:

1
4月 18 10:30:25 xxx systemd[1]: Got notification message from PID 9885, but reception only permitted for main PID 9880

如果对 systemd 比较熟悉,可以直接看出问题。因为我们这边用的 Type=notify,该设置下,服务启动后会发送 sd_notify 给 systemd,这样 systemd 确认该服务正常启动。这个报错就是:

  1. systemd 拉起的主进程 Pid 是 9880
  2. 从非主进程的 9885 收到了 notification 消息

解决该问题很简单,systemd 给了配置选项接收所有的 notify :

1
NotifyAccess=all

logrotate

日志要写入到文件,那就一定要遵守 Linux 规范配置 logrotate 避免日志写满分区。然后相关配置完,写完配置用一个小的 size 参数测试了 logrotate 发现不行:

1
logrotate -v /etc/logrotate.d/kube-apiserver

logrotate 轮转日志分为两种模式,create 和 copytruncate,默认是 create,两种大致原理如下:

  • create:重命名日志文件,再创建原有的日志文件,等同于 mv + touch
  • copytruncate:cp xx.log xx.log.1 && truncate -s 0 xx.log

Linux 上打开文件名实际是操作 inode,文件名在打开 inode 后改名或者删掉文件 path 对进程并不会有影响,create 方式需要进程支持 reopen 日志文件路径使用新的 inode,类似 nginx 的 logrotate 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/var/log/nginx/*.log /var/log/nginx/*/*.log{
daily
missingok
rotate 14
compress
delaycompress
notifempty
create 640 root adm
sharedscripts
postrotate
[ ! -f /var/run/nginx.pid ] || kill -USR1 `cat /var/run/nginx.pid`
endscript
}

kube-apiserver 并没有支持这种行为,就尝试了下 copytruncate 发现也不行,然后去 kep 的 kube-log-runner 下回复了下请求添加 logrotate 支持。

修改

等官方估计很久了,先自己修改下源码支持下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package main

import (
"flag"
"fmt"
"io"
"log"
"os"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
)

var (
logFilePath = flag.String("log-file", "", "If non-empty, save stdout to this file")
alsoToStdOut = flag.Bool("also-stdout", false, "useful with log-file, log to standard output as well as the log file")
redirectStderr = flag.Bool("redirect-stderr", true, "treat stderr same as stdout")

// 新增全局变量
logFile *os.File
logFileMu sync.Mutex
globalLogFile string
globalAlsoStdOut bool
globalRedirectStderr bool
)

// SyncWriter 支持动态切换 Writer
type SyncWriter struct {
mu sync.Mutex
w io.Writer
}

func (sw *SyncWriter) Write(p []byte) (n int, err error) {
sw.mu.Lock()
defer sw.mu.Unlock()
return sw.w.Write(p)
}

func (sw *SyncWriter) SetWriter(w io.Writer) {
sw.mu.Lock()
defer sw.mu.Unlock()
sw.w = w
}

var (
outputSync = &SyncWriter{w: os.Stdout}
errSync = &SyncWriter{w: os.Stderr}
)

func main() {
flag.Parse()

if err := configureAndRun(); err != nil {
log.Fatal(err)
}
}

func configureAndRun() error {
// 保存参数到全局变量
globalLogFile = *logFilePath
globalAlsoStdOut = *alsoToStdOut
globalRedirectStderr = *redirectStderr

// 初始化日志文件
if globalLogFile != "" {
var err error
logFile, err = os.OpenFile(globalLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open log file: %w", err)
}

if globalAlsoStdOut {
outputSync.SetWriter(io.MultiWriter(os.Stdout, logFile))
} else {
outputSync.SetWriter(logFile)
}

if globalRedirectStderr {
errSync.SetWriter(outputSync)
}
}

args := flag.Args()
if len(args) == 0 {
return fmt.Errorf("not enough arguments to run")
}

exe := args[0]
var exeArgs []string
if len(args) > 1 {
exeArgs = args[1:]
}
cmd := exec.Command(exe, exeArgs...)
cmd.Stdout = outputSync
cmd.Stderr = errStream()

log.Printf("Running command:\n%v", cmdInfo(cmd))
err := cmd.Start()
if err != nil {
return fmt.Errorf("starting command: %w", err)
}

// 信号处理
go setupSigHandler(cmd.Process)
if err := cmd.Wait(); err != nil {
return fmt.Errorf("running command: %w", err)
}
return nil
}

func errStream() io.Writer {
if *redirectStderr {
return outputSync
}
return os.Stderr
}

func cmdInfo(cmd *exec.Cmd) string {
return fmt.Sprintf(
`Command env: (log-file=%v, also-stdout=%v, redirect-stderr=%v)
Run from directory: %v
Executable path: %v
Args (comma-delimited): %v`, *logFilePath, *alsoToStdOut, *redirectStderr,
cmd.Dir, cmd.Path, strings.Join(cmd.Args, ","),
)
}

// 修改后的信号处理函数
func setupSigHandler(process *os.Process) {
signals := []os.Signal{
syscall.SIGHUP, syscall.SIGINT,
syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1,
}
c := make(chan os.Signal, 1)
signal.Notify(c, signals...)

log.Println("Now listening for signals")
for s := range c {
if s == syscall.SIGUSR1 {
handleLogRotate()
} else {
forwardSignal(process, s)
}
}
}

// 处理日志轮转
func handleLogRotate() {
logFileMu.Lock()
defer logFileMu.Unlock()

if globalLogFile == "" {
log.Println("No log file configured, ignoring SIGUSR1")
return
}

// 关闭旧文件
if logFile != nil {
logFile.Sync()
if err := logFile.Close(); err != nil {
log.Printf("Error closing log file: %v", err)
}
}

// 打开新文件
newFile, err := os.OpenFile(globalLogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("ERROR: Failed to reopen log file: %v (logging to stdout)", err)
outputSync.SetWriter(os.Stdout)
if globalRedirectStderr {
errSync.SetWriter(os.Stdout)
}
return
}

logFile = newFile
log.SetOutput(logFile)
log.Println("Successfully reopened log file")

// 更新输出流
if globalAlsoStdOut {
outputSync.SetWriter(io.MultiWriter(os.Stdout, logFile))
} else {
outputSync.SetWriter(logFile)
}

if globalRedirectStderr {
errSync.SetWriter(outputSync)
}
}

func forwardSignal(process *os.Process, s os.Signal) {
log.Printf("Forwarding signal %v to PID %v", s, process.Pid)
if err := process.Signal(s); err != nil {
log.Printf("Error forwarding signal %v: %v", s, err)
}
}

上面代码不转发 USR1 信号给套娃的进程,自身处理 USR1 信号就是 reopen 日志文件。然后 logrotate 触发需要 pid 文件,systemd 里增加:

1
ExecStartPost=/bin/sh -c "echo $MAINPID > /var/run/kube-apiserver.pid"

logrotate 配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
/var/logs/kube-apiserver.log {
daily
rotate 5
size 400M
missingok
compress
nomail
delaycompress
create
postrotate
[ ! -f /var/run/kube-apiserver.pid ] || kill -USR1 `cat /var/run/kube-apiserver.pid`
endscript
}

参考

CATALOG
  1. 1. 由来
  2. 2. 经过
    1. 2.1. 选型
    2. 2.2. 尝试
    3. 2.3. logrotate
    4. 2.4. 修改
  3. 3. 参考