编写日期:2018-03-15
编写作者:mtsbv110
主机规划:192.168.9.131
ngx_openresty版本 |
ngx_openresty-1.7.7.2.tar.gz |
安装目录 |
/usr/local/servers |
ngx_cache_purge |
2.3.tar.gz |
安装目录 |
/usr/local/servers/ngx_openresty-1.7.7.2/bundle |
nginx_upstream_check_module |
v0.3.0.tar.gz |
安装目录 |
/usr/local/servers/ngx_openresty-1.7.7.2/bundle |
http_headers.lua |
http_headers.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
http.lua |
http.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
template.lua |
template.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
html.lua |
html.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
lua-resty-kafka-master |
lua-resty-kafka-master.zip |
安装目录 |
/usr/local/applications/hello/lualib/ |
主机规划:192.168.9.132
ngx_openresty版本 |
ngx_openresty-1.7.7.2.tar.gz |
安装目录 |
/usr/local/servers |
ngx_cache_purge |
2.3.tar.gz |
安装目录 |
/usr/local/servers/ngx_openresty-1.7.7.2/bundle |
nginx_upstream_check_module |
v0.3.0.tar.gz |
安装目录 |
/usr/local/servers/ngx_openresty-1.7.7.2/bundle |
http_headers.lua |
http_headers.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
http.lua |
http.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
template.lua |
template.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
html.lua |
html.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
lua-resty-kafka-master |
lua-resty-kafka-master.zip |
安装目录 |
/usr/local/applications/hello/lualib |
主机规划:192.168.9.133
ngx_openresty版本 |
ngx_openresty-1.7.7.2.tar.gz |
安装目录 |
/usr/local/servers |
ngx_cache_purge |
2.3.tar.gz |
安装目录 |
/usr/local/servers/ngx_openresty-1.7.7.2/bundle |
nginx_upstream_check_module |
v0.3.0.tar.gz |
安装目录 |
/usr/local/servers/ngx_openresty-1.7.7.2/bundle |
http_headers.lua |
http_headers.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
http.lua |
http.lua |
安装目录 |
/usr/local/applications/hello/lualib/resty |
在nginx接收到访问请求的时候,就把请求的流量上报发送给kafka
从lua脚本直接创建一个kafka producer,发送数据到kafka
[[email protected] local]#
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
[[email protected] local]# unzip master.zip
[[email protected] local]#cd /usr/local/applications/hello/lualib
[[email protected] lualib]#
cp -r /usr/local/lua-resty-kafka-master/lib/resty/ /usr/local/applications/hello/lualib
[[email protected] kafka]# /usr/local/servers/nginx/sbin/nginx -s reload
同理在192.168.9.132做同样的配置
在192.168.9.131和192.168.9.132中配置appication中lua,对流量进行上报KafKa的上报
local cjson = require("cjson") local producer = require("resty.kafka.producer")
local broker_list = { { host = "192.168.9.131", port = 9092 }, { host = "192.168.9.132", port = 9092 }, { host = "192.168.9.133", port = 9092 } }
local log_json = {} log_json["request_module"]="product_detail_info" log_json["headers"] = ngx.req.get_headers() log_json["uri_args"] = ngx.req.get_uri_args() log_json["body"] = ngx.req.read_body() log_json["http_version"] = ngx.req.http_version() log_json["method"] =ngx.req.get_method() log_json["raw_reader"] = ngx.req.raw_header() log_json["body_data"] = ngx.req.get_body_data()
local message = cjson.encode(log_json);
local uri_args = ngx.req.get_uri_args() local productId = uri_args["productId"] local shopId = uri_args["shopId"]
local async_producer = producer:new(broker_list, { producer_type = "async" }) local ok, err = async_producer:send("access-log", productId, message)
local cache_ngx = ngx.shared.my_cache
local productCacheKey = "product_info_"..productId local shopCacheKey = "shop_info_"..shopId
local productCache = cache_ngx:get(productCacheKey) local shopCache = cache_ngx:get(shopCacheKey)
if productCache == "" or productCache == nil then local http = require("resty.http") local httpc = http.new()
local resp, err = httpc:request_uri("http://192.168.9.137:8080",{ method = "GET", path = "/getProductInfo?productId="..productId })
productCache = resp.body cache_ngx:set(productCacheKey, productCache, 10 * 60) end
if shopCache == "" or shopCache == nil then local http = require("resty.http") local httpc = http.new()
local resp, err = httpc:request_uri("http://192.168.9.137:8080",{ method = "GET", path = "/getShopInfo?shopId="..shopId })
shopCache = resp.body cache_ngx:set(shopCacheKey, shopCache, 10 * 60) end
local productCacheJSON = cjson.decode(productCache) local shopCacheJSON = cjson.decode(shopCache)
local context = { productId = productCacheJSON.id, productName = productCacheJSON.name, productPrice = productCacheJSON.price, productPictureList = productCacheJSON.pictureList, productSpecification = productCacheJSON.specification, productService = productCacheJSON.service, productColor = productCacheJSON.color, productSize = productCacheJSON.size, shopId = shopCacheJSON.id, shopName = shopCacheJSON.name, shopLevel = shopCacheJSON.level, shopGoodCommentRate = shopCacheJSON.goodCommentRate }
local template = require("resty.template") template.render("product.html", context) |
需要在nginx.conf中,http部分,加入resolver 8.8.8.8;
修改完成之后重启Nginx
[[email protected] config]# /usr/local/servers/nginx/sbin/nginx -s reload
同理将上述的操作同时在192.168.9.132上操作
需要在kafka中加入advertised.host.name = 192.168.9.13X(1,2,3),重启三个kafka进程
创建一个KafKa的消息通道
[[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.9.131:2181,192.168.9.132:2182,192.168.9.133:2183 --partitions 1 --replication-factor 1 --topic access-log
测试Kafka消息通道
bin/kafka-console-consumer.sh --zookeeper 192.168.9.131:2181,192.168.9.132:2182,192.168.9.133:2183 --topic access-log --from-beginning
在浏览器上测试,可以从KafKa 消费端获取相应的数据
http://192.168.9.133/product?requestPath=product&productId=1&shopId=1