CentOS 基于OpenResty(Nginx+Lua)完成访问流量实时上报Kafka

时间:2024-05-18 16:40:51

编写日期:2018-03-15

编写作者:mtsbv110

邮    箱:[email protected]

 

 

主机规划:192.168.9.131

ngx_openresty版本

ngx_openresty-1.7.7.2.tar.gz

安装目录

/usr/local/servers  

ngx_cache_purge

2.3.tar.gz

安装目录

/usr/local/servers/ngx_openresty-1.7.7.2/bundle

nginx_upstream_check_module

v0.3.0.tar.gz

安装目录

/usr/local/servers/ngx_openresty-1.7.7.2/bundle

http_headers.lua

http_headers.lua

安装目录

/usr/local/applications/hello/lualib/resty

http.lua

http.lua

安装目录

/usr/local/applications/hello/lualib/resty

template.lua

template.lua

安装目录

/usr/local/applications/hello/lualib/resty

html.lua

html.lua

安装目录

/usr/local/applications/hello/lualib/resty

lua-resty-kafka-master

lua-resty-kafka-master.zip

安装目录

/usr/local/applications/hello/lualib/

 

主机规划:192.168.9.132

ngx_openresty版本

ngx_openresty-1.7.7.2.tar.gz

安装目录

/usr/local/servers  

ngx_cache_purge

2.3.tar.gz

安装目录

/usr/local/servers/ngx_openresty-1.7.7.2/bundle

nginx_upstream_check_module

v0.3.0.tar.gz

安装目录

/usr/local/servers/ngx_openresty-1.7.7.2/bundle

http_headers.lua

http_headers.lua

安装目录

/usr/local/applications/hello/lualib/resty

http.lua

http.lua

安装目录

/usr/local/applications/hello/lualib/resty

template.lua

template.lua

安装目录

/usr/local/applications/hello/lualib/resty

html.lua

html.lua

安装目录

/usr/local/applications/hello/lualib/resty

lua-resty-kafka-master

lua-resty-kafka-master.zip

安装目录

/usr/local/applications/hello/lualib

 

主机规划:192.168.9.133

ngx_openresty版本

ngx_openresty-1.7.7.2.tar.gz

安装目录

/usr/local/servers  

ngx_cache_purge

2.3.tar.gz

安装目录

/usr/local/servers/ngx_openresty-1.7.7.2/bundle

nginx_upstream_check_module

v0.3.0.tar.gz

安装目录

/usr/local/servers/ngx_openresty-1.7.7.2/bundle

http_headers.lua

http_headers.lua

安装目录

/usr/local/applications/hello/lualib/resty

http.lua

http.lua

安装目录

/usr/local/applications/hello/lualib/resty

 

 

在nginx接收到访问请求的时候,就把请求的流量上报发送给kafka

从lua脚本直接创建一个kafka producer,发送数据到kafka

 

[[email protected] local]#

 wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip

 

[[email protected] local]# unzip master.zip

[[email protected] local]#cd /usr/local/applications/hello/lualib

[[email protected] lualib]#

 cp -r /usr/local/lua-resty-kafka-master/lib/resty/ /usr/local/applications/hello/lualib

CentOS 基于OpenResty(Nginx+Lua)完成访问流量实时上报Kafka

[[email protected] kafka]# /usr/local/servers/nginx/sbin/nginx -s reload

 

同理在192.168.9.132做同样的配置

 

 

在192.168.9.131和192.168.9.132中配置appication中lua,对流量进行上报KafKa的上报

local cjson = require("cjson")

local producer = require("resty.kafka.producer")

 

local broker_list = {  

    { host = "192.168.9.131", port = 9092 },  

    { host = "192.168.9.132", port = 9092 },  

    { host = "192.168.9.133", port = 9092 }

}

 

local log_json = {}

log_json["request_module"]="product_detail_info"

log_json["headers"] = ngx.req.get_headers()  

log_json["uri_args"] = ngx.req.get_uri_args()  

log_json["body"] = ngx.req.read_body()  

log_json["http_version"] = ngx.req.http_version()  

log_json["method"] =ngx.req.get_method()

log_json["raw_reader"] = ngx.req.raw_header()  

log_json["body_data"] = ngx.req.get_body_data()  

 

local message = cjson.encode(log_json);  

 

local uri_args = ngx.req.get_uri_args()

local productId = uri_args["productId"]

local shopId = uri_args["shopId"]

 

 

local async_producer = producer:new(broker_list, { producer_type = "async" })   

local ok, err = async_producer:send("access-log", productId, message)  

 

local cache_ngx = ngx.shared.my_cache

 

local productCacheKey = "product_info_"..productId

local shopCacheKey = "shop_info_"..shopId

 

local productCache = cache_ngx:get(productCacheKey)

local shopCache = cache_ngx:get(shopCacheKey)

 

if productCache == "" or productCache == nil then

        local http = require("resty.http")

        local httpc = http.new()

 

        local resp, err = httpc:request_uri("http://192.168.9.137:8080",{

                method = "GET",

                path = "/getProductInfo?productId="..productId

        })

 

        productCache = resp.body

        cache_ngx:set(productCacheKey, productCache, 10 * 60)

end

 

if shopCache == "" or shopCache == nil then

        local http = require("resty.http")

        local httpc = http.new()

 

        local resp, err = httpc:request_uri("http://192.168.9.137:8080",{

                method = "GET",

                path = "/getShopInfo?shopId="..shopId

        })

 

        shopCache = resp.body

        cache_ngx:set(shopCacheKey, shopCache, 10 * 60)

end

 

 

local productCacheJSON = cjson.decode(productCache)

local shopCacheJSON = cjson.decode(shopCache)

 

local context = {

        productId = productCacheJSON.id,

        productName = productCacheJSON.name,

        productPrice = productCacheJSON.price,

        productPictureList = productCacheJSON.pictureList,

        productSpecification = productCacheJSON.specification,

        productService = productCacheJSON.service,

        productColor = productCacheJSON.color,

        productSize = productCacheJSON.size,

        shopId = shopCacheJSON.id,

        shopName = shopCacheJSON.name,

        shopLevel = shopCacheJSON.level,

        shopGoodCommentRate = shopCacheJSON.goodCommentRate

}

 

local template = require("resty.template")

template.render("product.html", context)

 

CentOS 基于OpenResty(Nginx+Lua)完成访问流量实时上报Kafka

 

 

需要在nginx.conf中,http部分,加入resolver 8.8.8.8;

修改完成之后重启Nginx

[[email protected] config]# /usr/local/servers/nginx/sbin/nginx -s reload

 

同理将上述的操作同时在192.168.9.132上操作

 

需要在kafka中加入advertised.host.name = 192.168.9.13X(1,2,3),重启三个kafka进程

创建一个KafKa的消息通道

 

[[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.9.131:2181,192.168.9.132:2182,192.168.9.133:2183 --partitions 1 --replication-factor 1 --topic access-log

 

测试Kafka消息通道

 bin/kafka-console-consumer.sh --zookeeper 192.168.9.131:2181,192.168.9.132:2182,192.168.9.133:2183 --topic access-log --from-beginning

 

在浏览器上测试,可以从KafKa 消费端获取相应的数据

http://192.168.9.133/product?requestPath=product&productId=1&shopId=1

CentOS 基于OpenResty(Nginx+Lua)完成访问流量实时上报Kafka