运维中,我们有时候想要记录服务器上谁登录了、干了什么,前者可以使用之前的ssh登录报警这篇文章,后者则可以参考本文。
此功能利用的是history命令、PROMPT_COMMAND环境变量、/etc/profile.d/扩展全局环境变量,Linux中的PROMPT_COMMAND会记录下出现提示符前面的命令,利用这个特性可以实现记录所有用户的操作记录。
环境:
centos6、7,要求安装iproute(yum install iproute)
原理:
自定义history和PROMPT_COMMAND相关环境变量,放到/etc/profile.d/中令全局生效,记录所有用户的命令操作。
步骤:
1. 将以下代码保存到/etc/profile.d/cmdhistLog.sh
# bash cmd audit
# root touch $HISTFILE && chmod a+w $HISTFILE, yum install iproute
readonly HISTSIZE=10000
readonly HISTFILESIZE=10000
readonly HISTFILE=/tmp/XingkaOpsCmdHist.log
readonly SERVER_IPS=$(/usr/sbin/ip a | grep inet | grep -v inet6 | grep -v 127 | sed \'s/^[ \t]*//g\' | cut -d \' \' -f2 | awk -F \'/\' \'{print $1}\' | xargs | sed \'s/\s/,/g\')
readonly PROMPT_COMMAND=\'{ \
if [ -z "$OLD_PWD" ];then
export OLD_PWD=$PWD;
fi;
if [ ! -z "$LAST_CMD" ] && [ "$(history 1)" != "$LAST_CMD" ]; then
msg=$(history 1 | { read x y; echo $y; });echo [$(date +"%Y-%m-%d %H:%M:%S")] [\($USER\) $(who am i |awk "{print \$1\" \"\$2\" \"\$NF}" | sed -e "s/[()]//g")] [$(hostname)@$SERVER_IPS] "$msg" >> $HISTFILE;
fi;
export LAST_CMD="$(history 1)";
export OLD_PWD=$PWD; }\'
shopt -s histappend
export HISTSIZE HISTFILESIZE HISTFILE PROMPT_COMMAND
2. 上述保存后,重新登录的终端即可生效,建议手动创建下日志文件,并需要授予所有人可写权限
# $HISTFILE改为实际日志文件路径
touch $HISTFILE && chmod a+w $HISTFILE
3. 使用python清洗日志,筛选出有效的命令数据并上报给运维平台、数据库等
# PS: 脚本可能需要根据实际情况自行修改。
#!/usr/bin/python
# coding: utf8
# Author: taochengwei
# Email: taochengwei@starokay.com
# Date: 2018-12-11
# Docs: Command the audit log report
import os
import re
import sys
import stat
import json
import urllib
import urllib2
import traceback
reload(sys)
sys.setdefaultencoding(\'utf-8\')
log = os.getenv(\'HISTFILE\', \'/tmp/XingkaOpsCmdHist.log\')
pat = re.compile(r\'^(\[.*\])\s(\[.*\])\s(\[.*\])\s(.*)$\')
if not os.path.isfile(log):
with open(log, "w") as f:
f.write("")
os.chmod(log, stat.S_IWUSR+stat.S_IRUSR+stat.S_IRGRP+stat.S_IWGRP+stat.S_IWOTH)
def post(url, data, token=None):
""" POST请求 """
if isinstance(data, dict):
data = urllib.urlencode(data) # 将字典以url形式编码
headers = {\'AccessToken\': token} if token else {}
headers.update({"User-Agent": "Mozilla/5.0 (X11; CentOS; Linux i686; rv:7.0.1406) Gecko/20100101 OpsRequestBot/0.1", "Content-Type": "application/json"})
request = urllib2.Request(url, data=data, headers=headers)
response = urllib2.urlopen(request)
response = response.read()
try:
data = json.loads(response)
except Exception, e:
traceback.print_exc()
data = response
return data
if __name__ == "__main__" and log and os.path.isfile(log):
access_token = ""
# read log
with open(log, \'rb\') as f:
data = f.read()
os.chmod(log, stat.S_IWUSR+stat.S_IRUSR+stat.S_IRGRP+stat.S_IWGRP+stat.S_IWOTH)
# set post data pool
# pool 是筛选后的有效命令数据,是一个列表,可以上报到运维平台或写入数据库中
pool = []
for cmd in data.split("\n"):
if pat.match(cmd):
cmd = [i for i in pat.split(cmd) if i]
if cmd and isinstance(cmd, (list, tuple)) and len(cmd) == 4:
try:
# Time to execute the command
ctime = cmd[0].lstrip(\'[\').rstrip(\']\')
# Switch user, real login user, terminal for pts or tty, client ip
suser, luser, terminal, clientIp = cmd[1].lstrip(\'[\').rstrip(\']\').split()
suser = suser.replace(\'(\', \'\').replace(\')\', \'\')
# the server hostname and all ip
hostname, serverIps = cmd[2].lstrip(\'[\').rstrip(\']\').split(\'@\')
# real bash command
command = cmd[3]
except Exception, e:
traceback.print_exc()
print cmd
else:
pool.append(dict(ctime=ctime, suser=suser, luser=luser, terminal=terminal, clientIp=clientIp, hostname=hostname, serverIps=serverIps, command=command))
# post data with pool
res = post("接收上报的运维平台接口地址", json.dumps(pool), access_token)
if res["code"] == 0:
with open(log, "w") as f:
f.write("")
print(res)
4. 后端处理
# 这是后端运维平台的接口路由函数,它接收命令审计数据,并用异步任务写入数据库
def serverCmdHist(self, data):
"""服务器历史命令记录"""
res = dict(code=1, msg=None)
if data and isinstance(data, (list, tuple)):
# 需要进一步筛选有效条目
pool = []
for log in data:
if log and isinstance(log, dict) and \
"ctime" in log and \
"serverIps" in log and \
"hostname" in log and \
"terminal" in log and \
"command" in log and \
"suser" in log and \
"luser" in log and \
"clientIp" in log:
# check log params
try:
if not user_pat.match(log["luser"]) or not ip_check(log["clientIp"]) or not log["hostname"] or not log["command"]:
raise
datetime_to_timestamp(log["ctime"])
except:
pass
else:
# 此处生成一条有效命令唯一标识,避免误提交审计日志时重复写入数据库
log.update(oci=md5("%s-%s-%s-%s-%s" %(log["hostname"], log["ctime"], log["luser"], log["suser"], log["command"])))
pool.append(log)
# 此处pool中的命令记录都应该是有效的,放到异步任务中写入到mysql
self.asyncQueueHigh.enqueue_call(func=ServerCmdHist2MySQL, args=(pool,), timeout=3600)
res.update(code=0)
else:
res.update(msg="data error")
return res
5. 数据库表结构
CREATE TABLE `servercmdhist` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`oci` char(32) NOT NULL COMMENT \'命令唯一标识\',
`hostname` varchar(150) NOT NULL COMMENT \'主机名\',
`serverIps` varchar(255) DEFAULT NULL COMMENT \'主机ip\',
`ctime` char(19) NOT NULL COMMENT \'命令执行的时间\',
`luser` varchar(32) NOT NULL COMMENT \'实际登录的用户\',
`suser` varchar(32) DEFAULT NULL COMMENT \'实际用户切换后的用户\',
`terminal` varchar(50) DEFAULT NULL COMMENT \'登录终端类型\',
`clientIp` varchar(15) NOT NULL COMMENT \'登录来源ip\',
`command` varchar(10000) NOT NULL COMMENT \'执行的命令\',
PRIMARY KEY (`id`),
UNIQUE KEY `oci` (`oci`),
KEY `hostname` (`hostname`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;