案例一
rewrite_by_lua '
--引入openresty自带的json处理对象
local cjson = require("cjson")
local producer = require "resty.kafka.producer"
-- 定义kafka broker地址,ip需要和kafka的host.name配置一致
local broker_list = {
{ host = "192.168.115.28", port = 9092 },
{ host = "192.168.115.29", port = 9092 },
{ host = "192.168.115.30", port = 9092 }
}
-- 定义json便于日志数据整理收集
local log_json = {}
local request_method = ngx.req.get_method
if "GET" == request_method then
-- 普通get请求
ngx.log(ngx.ERR,"不支持GET请求")
log_json["start_time"]=ngx.req.start_time()
log_json["header"]=ngx.req.raw_header()
log_json["uri"]=ngx.req.get_uri_args()
log_json["headers"]=ngx.req.get_headers()
log_json["body"]=ngx.req.read_body()
log_json["body_data"]=ngx.req.get_body_data()
elseif "POST" == request_method then
ngx.req.read_body()
local body_data = ngx.req.get_body_data() --body_data可是符合http协议的请求体,不是普通的字符串
--请求体的size大于nginx配置里的client_body_buffer_size,则会导致请求体被缓冲到磁盘临时文件里,client_body_buffer_size默认是8k或者16k
if not body_data then
ngx.log(ngx.WARN,"no request body found" )
end
log_json["start_time"]=ngx.req.start_time()
log_json["header"]=ngx.req.raw_header()
log_json["uri"]=ngx.req.get_uri_args()
log_json["post"]=ngx.req.get_post_args()
log_json["headers"]=ngx.req.get_headers()
log_json["body"]=ngx.req.read_body()
log_json["body_data"]=ngx.req.get_body_data()
end
ngx.log(ngx.INFO,"log_json",cjson.encode(log_json))
-- 转换json为字符串
local message = cjson.encode(log_json);
-- 定义kafka异步生产者
local bp = producer:new(broker_list, { producer_type = "async" })
-- 发送日志消息,send第二个参数key,用于kafka路由控制:
-- key为nill(空)时,一段时间向同一partition写入数据
-- 指定key,按照key的hash写入到对应的partition
local ok, err = bp:send("postman1", nil, message)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
';
案例二
server {
listen 80;
server_name localhost;
location /favicon.ico {
root html;
index index.html index.htm;
}
location / {
proxy_connect_timeout 8;
proxy_send_timeout 8;
proxy_read_timeout 8;
proxy_buffer_size 4k;
proxy_buffers 512 8k;
proxy_busy_buffers_size 8k;
proxy_temp_file_write_size 64k;
proxy_next_upstream http_500 http_502 http_503 http_504 error timeout invalid_header;
root html;
index index.html index.htm;
proxy_pass http://rc;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制
rewrite_by_lua '
-- 引入lua所有api
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
-- 定义kafka broker地址,ip需要和kafka的host.name配置一致
local broker_list = {
{ host = "10.10.78.52", port = 9092 },
}
-- 定义json便于日志数据整理收集
local log_json = {}
log_json["uri"]=ngx.var.uri
log_json["args"]=ngx.var.args
log_json["host"]=ngx.var.host
log_json["request_body"]=ngx.var.request_body
log_json["remote_addr"] = ngx.var.remote_addr
log_json["remote_user"] = ngx.var.remote_user
log_json["time_local"] = ngx.var.time_local
log_json["status"] = ngx.var.status
log_json["body_bytes_sent"] = ngx.var.body_bytes_sent
log_json["http_referer"] = ngx.var.http_referer
log_json["http_user_agent"] = ngx.var.http_user_agent
log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for
log_json["upstream_response_time"] = ngx.var.upstream_response_time
log_json["request_time"] = ngx.var.request_time
-- 转换json为字符串
local message = cjson.encode(log_json);
-- 定义kafka异步生产者
local bp = producer:new(broker_list, { producer_type = "async" })
-- 发送日志消息,send第二个参数key,用于kafka路由控制:
-- key为nill(空)时,一段时间向同一partition写入数据
-- 指定key,按照key的hash写入到对应的partition
local ok, err = bp:send("test1", nil, message)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
';
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
参考:https://blog.csdn.net/u011239989/article/details/52239785
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
error_log logs/error.log info;
pid logs/nginx.pid;
events {
use epoll;
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log logs/access.log main;
sendfile on;
tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
lua_package_path "lua-resty-kafka/lib/?.lua;;";
client_body_buffer_size 128m;
server {
listen 80;
server_name localhost;
location / {
root html;
index index.html index.htm;
}
location ^~ /api/getuserinfo {
rewrite_by_lua '
-- 引入lua所有api
local cjson = require("cjson")
local producer = require "resty.kafka.producer"
-- 定义kafka broker地址,ip需要和kafka的host.name配置一致
local broker_list = {
{ host = "192.168.115.28", port = 9092 },
{ host = "192.168.115.29", port = 9092 },
{ host = "192.168.115.30", port = 9092 }
}
-- 定义json便于日志数据整理收集
local log_json = {}
ngx.req.read_body()
log_json["start_time"]=ngx.req.start_time()
log_json["header"]=ngx.req.raw_header()
log_json["uri"]=ngx.req.get_uri_args()
log_json["post"]=ngx.req.get_post_args()
log_json["headers"]=ngx.req.get_headers()
log_json["body_data"]=ngx.req.get_body_data()
log_json["uri"]=ngx.var.uri
log_json["args"]=ngx.var.args
log_json["host"]=ngx.var.host
log_json["request_body"]=ngx.var.request_body
log_json["remote_addr"] = ngx.var.remote_addr
log_json["remote_user"] = ngx.var.remote_user
log_json["time_local"] = ngx.var.time_local
log_json["status"] = ngx.var.status
log_json["body_bytes_sent"] = ngx.var.body_bytes_sent
log_json["http_referer"] = ngx.var.http_referer
log_json["http_user_agent"] = ngx.var.http_user_agent
log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for
log_json["upstream_response_time"] = ngx.var.upstream_response_time
log_json["request_time"] = ngx.var.request_time
-- 转换json为字符串
local message = cjson.encode(log_json);
-- 定义kafka异步生产者
local bp = producer:new(broker_list, { producer_type = "async" })
-- 发送日志消息,send第二个参数key,用于kafka路由控制:
-- key为nill(空)时,一段时间向同一partition写入数据
-- 指定key,按照key的hash写入到对应的partition
local ok, err = bp:send("postman1", nil, message)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end
';
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
}
nginx+lua+kafka 编写 在线日志上报系统的更多相关文章
-
【原创】运维基础之OpenResty(Nginx+Lua)+Kafka
使用docker部署 1 下载 # wget https://github.com/doujiang24/lua-resty-kafka/archive/v0.06.tar.gz# tar xvf v ...
-
ELK+Kafka集群日志分析系统
ELK+Kafka集群分析系统部署 因为是自己本地写好的word文档复制进来的.格式有些出入还望体谅.如有错误请回复.谢谢! 一. 系统介绍 2 二. 版本说明 3 三. 服务部署 3 1) JDK部 ...
-
flink---实时项目--day01--1. openrestry的安装 2. 使用nginx+lua将日志数据写入指定文件中 3. 使用flume将本地磁盘中的日志数据采集到的kafka中去
1. openrestry的安装 OpenResty = Nginx + Lua,是⼀一个增强的Nginx,可以编写lua脚本实现⾮非常灵活的逻辑 (1)安装开发库依赖 yum install -y ...
-
Openresty+Lua+Kafka实现日志实时采集
简介 在很多数据采集场景下,Flume作为一个高性能采集日志的工具,相信大家都知道它.许多人想起Flume这个组件能联想到的大多数都是Flume跟Kafka相结合进行日志的采集,这种方案有很多他的优点 ...
-
实战:一种在http请求中使用protobuffer+nginx+lua收集打点日志的方案
背景 app打点日志的上报和收集,是互联网公司的基本需求. 一.方案选择 1.1 protobuffer vs json 探究一种以最高效的方式上报和解析打点数据是一个系统性的问题,需要解决的子问题有 ...
-
ELK+kafka构建日志收集系统
ELK+kafka构建日志收集系统 原文 http://lx.wxqrcode.com/index.php/post/101.html 背景: 最近线上上了ELK,但是只用了一台Redis在 ...
-
【转载】scribe、chukwa、kafka、flume日志系统对比
原文地址:http://www.ttlsa.com/log-system/scribe-chukwa-kafka-flume-log-system-contrast/ 1. 背景介绍许多公司的平台每天 ...
-
scribe、chukwa、kafka、flume日志系统对比 -摘自网络
1. 背景介绍许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等),处理这些日志需要特定的日志系统,一般而言,这些系统需要具有以下特征:(1) 构建应用系统和分析系统的桥梁 ...
-
Go语言学习之11 日志收集系统kafka库实战
本节主要内容: 1. 日志收集系统设计2. 日志客户端开发 1. 项目背景 a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 b. 当系统机器比较少时,登陆到服务器上查看即可 ...
随机推荐
-
Mobile Web中URL设计问题
自己虽然也注册了CSDN账号,但是没有在上面发表过博客等内容.不过经常在Google里面搜索相关内容时,会显示csdn的结果.这也说明国内很多IT人员都会在CSDN发表博客,记录解决问题过程或者想法. ...
-
jQuery文档加载完毕的几种写法
js中文档加载完毕.一般在body加一个onload事件或者window.onload = function () {} jQuery中有好多写法,平时也不注意,别人一问,还真觉得头大. 下面是我整理 ...
-
第三章 MySQL高级查询(一)
第三章 MySQL高级查询(一) 一.SQL语言的四个分类 1. DML(Data Manipulation Language)(数据操作语言):用来插入,修改和删除表中的数据,如INSE ...
-
C语言程序注释风格
良好编程习惯的养成对于一个程序员的发展非常重要,而注释对于一份程序来讲又是一个必不可少的组成部分,今天来研究一下C语言程序的注释风格. 注释是源码程序中非常重要的一部分,一般情况下,源程序有效注释量必 ...
-
Windows Server 2016-MS服务器应用程序兼容性列表
该表罗列支持 Window Server 2016 上安装和功能的 Microsoft 服务器应用程序. 此信息用于快速参考,不用于替代有关单个产品的规格.要求.公告或每个服务器应用程序的常规通信的说 ...
-
[GIT] 更新.repo目录
cd .repo/manifests/ git co -f git pull
-
【webpack】中clean-weabpack-plugin使用方法
在webpack中打包的文件通常是通过hash生成的,如果文件改动,那么打包的文件就会越来越多,如果想清除之前的文件,可以使用clean-weabpack-plugin插件来处理 注意版本号:我使 ...
-
python 带正则的search 模块
glob 是python 提供的一个支持正则表达式的查找文件的模块. 实现上采用了os.listdir() 和 fnmatch.fnmatch(). 但是没有真的invoking a subshe ...
-
20155312 2016-2017-2 《Java程序设计》第九周学习总结
20155312 2016-2017-2 <Java程序设计>第九周学习总结 课堂内容总结 两个类有公用的东西放在父类里. 面向对象的三要素 封装 继承 多态:用父类声明引用,子类生成对象 ...
-
InnoDB: Operating system error number 87 in a file operation. 错误87的解决方法
InnoDB: Operating system error number 87 in a file operation. 错误87的解决方法 140628 8:10:48 [Note] Plugi ...