nginx lua集成kafka的实现方法

时间:2022-04-22 11:45:35

第一步:进入opresty目录

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[root@node03 openresty]# cd /export/servers/openresty/
[root@node03 openresty]# ll
total 356
drwxr-xr-x 2 root root  4096 Jul 26 11:33 bin
drwxrwxr-x 44 1000 1000  4096 Jul 26 11:31 build
drwxrwxr-x 43 1000 1000  4096 Nov 13 2017 bundle
-rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure
-rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT
drwxr-xr-x 6 root root  4096 Jul 26 11:33 luajit
drwxr-xr-x 6 root root  4096 Aug 1 08:14 lualib
-rw-r--r-- 1 root root  5413 Jul 26 11:32 Makefile
drwxr-xr-x 11 root root  4096 Jul 26 11:35 nginx
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 patches
drwxr-xr-x 44 root root  4096 Jul 26 11:33 pod
-rw-rw-r-- 1 1000 1000  3689 Nov 13 2017 README.markdown
-rw-rw-r-- 1 1000 1000  8690 Nov 13 2017 README-win32.txt
-rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index
drwxr-xr-x 5 root root  4096 Jul 26 11:33 site
drwxr-xr-x 2 root root  4096 Aug 1 10:54 testlua
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 util
[root@node03 openresty]#

说明:接下来我们关注两个目录 lualib 和 nginx

1.lualib: 是存放opresty所需要的集成软件包的

2.nginx: 是nginx服务目录

接下来,我们进入lualib目录一看究竟:

?
1
2
3
4
5
6
7
8
[root@node03 openresty]# cd lualib/
[root@node03 lualib]# ll
total 116
-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
drwxr-xr-x 3 root root  4096 Jul 26 11:33 ngx
drwxr-xr-x 2 root root  4096 Jul 26 11:33 rds
drwxr-xr-x 2 root root  4096 Jul 26 11:33 redis
drwxr-xr-x 9 root root  4096 Aug 1 10:34 resty

这里我们看到了redis和ngx集成软件包,说明我们可以之间使用nginx和redis而无需导入任何依赖包!!!!

下面看看resty里面有些说明呢????

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[root@node03 lualib]# cd resty/
[root@node03 resty]# ll
total 152
-rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 core
-rw-r--r-- 1 root root  596 Jul 26 11:33 core.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns
drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka  #这是我们自己导入的
drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit
-rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache
-rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root  616 Jul 26 11:33 random.lua
-rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua
-rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root  236 Jul 26 11:33 sha.lua
-rw-r--r-- 1 root root  698 Jul 26 11:33 string.lua
-rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream
drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket

这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:这里的 kafka 这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包

我们看看kafka里面多有哪些包:

?
1
2
3
4
5
6
7
8
9
10
11
[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
-rw-r--r-- 1 root root  710 Aug 1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
-rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

附上 kafka 集成包:kafka.rar

第二步:创建kafka测试lua文件

1.退回到openresty

?
1
[root@node03 kafka]# cd /export/servers/openresty/

2.创建测试文件

?
1
2
[root@node03 openresty]# mkdir -r testlua
#这里文件名自己取,文件位置自己定,但必须找得到

这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!

3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件

创建文件:vim kafkalua.lua或者touch kafkalua.lua

?
1
2
3
4
[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua

kafkalua.lua:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
--测试语句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')
 
--数据采集阈值限制,如果lua采集超过阈值,则不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分区数
local PARTITION_NUM = 6
-- kafka主题名称
local TOPIC = 'B2CDATA_COLLECTION1'
-- 轮询器共享变量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
  return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka参数,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享内存计数器,用于kafka轮询使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
  pollingVal = 1
  shared_data:set(POLLING_KEY, pollingVal)
end
--获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)
 
-- 并发控制
local isGone = true
--获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
  isGone = false
end
-- 数据采集
if isGone then
 
  local time_local = ngx.var.time_local
  if time_local == nil then
    time_local = ""
  end
 
  local request = ngx.var.request
  if request == nil then
    request = ""
  end
 
  local request_method = ngx.var.request_method
  if request_method == nil then
    request_method = ""
  end
 
  local content_type = ngx.var.content_type
  if content_type == nil then
    content_type = ""
  end
  ngx.req.read_body()
  local request_body = ngx.var.request_body
  if request_body == nil then
    request_body = ""
  end
 
  local http_referer = ngx.var.http_referer
  if http_referer == nil then
    http_referer = ""
  end
 
  local remote_addr = ngx.var.remote_addr
  if remote_addr == nil then
    remote_addr = ""
  end
 
  local http_user_agent = ngx.var.http_user_agent
  if http_user_agent == nil then
    http_user_agent = ""
  end
 
  local time_iso8601 = ngx.var.time_iso8601
  if time_iso8601 == nil then
    time_iso8601 = ""
  end
 
  local server_addr = ngx.var.server_addr
  if server_addr == nil then
    server_addr = ""
  end
 
  local http_cookie = ngx.var.http_cookie
  if http_cookie == nil then
    http_cookie = ""
  end
--封装数据
  local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--创建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--发送数据
local ok, err = bp:send(TOPIC, partitions, message)
--打印错误日志
  if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
  end
end

第三步:修改nginx配置文件nginx.conf

1.进入ngin/conf目录

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]# ll
total 76
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

2.修改nginx.conf

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[root@node03 conf]# vim nginx.conf
 
    #1.说明找到第一个server
    #2.在server上面添加两行代码如下
    #3.在server里面添加kafka相关的代码如下
    
    
#------------------添加的代码---------------------------------------
 #开启共享字典,设置内存大小为10M,供每个nginx的线程消费
 lua_shared_dict shared_data 10m;
 #配置本地域名解析
 resolver 127.0.0.1;
#------------------添加的代码---------------------------------------
 
 server {
    listen    80;
    server_name localhost;
 
    #charset koi8-r;
 
    #access_log logs/host.access.log main;
    location / {
      root  html;
      index index.html index.htm;
    }
 
    #------------------添加的代码---------------------------------------
    location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空
      #开启nginx监控
      stub_status on;
      #加载lua文件
      default_type text/html;
      #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
      content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
    }
    #------------------添加的代码---------------------------------------
}

说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!!

看到我们上面配置了两个location,第一个为location /{...}第二个为location /kafkalua{...}那么他们有什么区别呢???先向下看,迷雾将会慢慢揭开。

第四步:启动nginx

1.进入nginx/sbin

?
1
2
3
4
[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]# ll
total 16356
-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

2.测试配置文件是否正确

?
1
2
3
4
[root@node03 sbin]# nginx -t
nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
#看到已经成功啦

3.启动nginx

?
1
2
[root@node03 sbin]# nginx
#不显示任何东西一般是成功啦

4.查看nginx是否启动成功

?
1
2
3
4
5
6
[root@node03 sbin]# ps -ef | grep nginx
root    3730   1 0 09:24 ?    00:00:00 nginx: master process nginx
nobody   3731  3730 0 09:24 ?    00:00:20 nginx: worker process is shutting down
nobody   5766  3730 0 12:17 ?    00:00:00 nginx: worker process
root    5824  3708 0 12:24 pts/1  00:00:00 grep nginx
<span class="hljs-comment">#看到有两个nginx进程,表示成功le</span>

5.浏览器访问nginx

在浏览器输入:node03/kafkalua

说明:如何么有配置hosts则输入openresty所在设备的地址如:192.168.52.120/kafkalua

nginx lua集成kafka的实现方法

在浏览器输入:node03/或者 192.168.52.120/

nginx lua集成kafka的实现方法

再在浏览器输入:node03:80/kafkalua 和 node03:80/试试 搬来nginx.conf来看看:

node03:80/kafkalua 这里的nide03是服务器的别名或者之间写文服务器地址,80是【listen 80;】配置的监听端口,80端口可以省略不写,如果这写成【listen 8088;】那么浏览器需输入 node03:8088/kafkalua (这里不能省略8088),kafkalua是工程名。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server {
   listen    80;
   server_name localhost;
 
   #charset koi8-r;
 
   #access_log logs/host.access.log main;
   location / {
     root  html;
     index index.html index.htm;
   }
 
   #------------------添加的代码---------------------------------------
   location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空
     #开启nginx监控
     stub_status on;
     #加载lua文件
     default_type text/html;
     #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
     content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
   }

第五步:创建测试爬虫程序

1.创建maven工程导入依赖

?
1
2
3
4
5
6
7
8
9
10
11
12
<dependencies>
   <dependency>
     <groupId>org.jsoup</groupId>
     <artifactId>jsoup</artifactId>
     <version>1.11.3</version>
   </dependency>
   <dependency>
     <groupId>org.apache.httpcomponents</groupId>
     <artifactId>httpclient</artifactId>
     <version>4.5.4</version>
   </dependency>
 </dependencies>

2.伪爬虫程序

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
public class SpiderGoAirCN {
  private static String basePath = "http://node03/kafkalua";
  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 50000; i++) {
      // 请求查询信息
      spiderQueryao();
      // 请求html
      spiderHtml();
      // 请求js
      spiderJs();
      // 请求css
      spiderCss();
      // 请求png
      spiderPng();
      // 请求jpg
      spiderJpg();
      Thread.sleep(100);
    }
  }
 
  /**
   *
   * @throws Exception
   */
  public static void spiderQueryao() throws Exception {
    // 1.指定目标网站   ^.*/B2C40/query/jaxb/direct/query.ao.*$
    String url = basePath + "/B2C40/query/jaxb/direct/query.ao";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
          "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
            + getGoTime() + "&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.80");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "243.45.78.132");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
            + getGoTime()
            + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
            + getGoTime() + ")");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }
 
  public static void spiderHtml() throws Exception {
    // 1.指定目标网站     ^.*html.*$
    String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    // httpPost.setEntity(new StringEntity(
    // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }
 
  public static void spiderJs() throws Exception {
 
    // 1.指定目标网站
    String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }
 
  public static void spiderCss() throws Exception {
 
    // 1.指定目标网站
    String url = basePath +"/B2C40/dist/main/css/flight.css";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader("Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }
 
  public static void spiderPng() throws Exception {
 
    // 1.指定目标网站
    String url =basePath + "/B2C40/dist/main/images/common.png";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }
 
  public static void spiderJpg() throws Exception {
 
    // 1.指定目标网站
    String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }
 
  public static String getLocalDateTime() {
    DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",
        Locale.ENGLISH);
    String nowAsISO = df.format(new Date());
    return nowAsISO;
 
  }
 
  public static String getISO8601Timestamp() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }
 
  public static String getGoTime() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }
 
  public static String getBackTime() {
    Date date = new Date();// 取时间
    Calendar calendar = new GregorianCalendar();
    calendar.setTime(date);
    calendar.add(calendar.DATE, +1);// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数
    date = calendar.getTime();
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
    String dateString = formatter.format(date);
    return dateString;
  }
}

第六步:启动kafka

1.创建主题topic

?
1
2
[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3
--replication-factor 3 --create --topic B2CDATA_COLLECTION1

2.开启kafka消费者

?
1
2
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092
--topic B2CDATA_COLLECTION1

第七步:开启爬虫程序并观察结果

1.启动爬虫程序

2.观察消费者窗口如下

nginx lua集成kafka的实现方法

第八步:启动kafka-manager观察

1.启动kafka-manager

?
1
2
3
4
5
6
7
8
9
10
11
[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]# ll
total 36
-rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager
-rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root 1383 May 1 06:27 log-config
-rw-r--r-- 1 root root  105 May 1 06:27 log-config.bat
[root@node01 bin]#
 
#启动
[root@node01 bin]# ./kafka-manager

启动后的窗口:

nginx lua集成kafka的实现方法

2.浏览器访问

浏览器输入:node01:9000

nginx lua集成kafka的实现方法

kafka manager使用不做讲解,观察B2CDATA_COLLECTION1主题消费情况:

​ 有三个分区,每个分区消费的消息差多说明成功啦,

​ 如果不一样,则是kafkalua.lua 脚本中没有配置分区策略,默认分区会导致 数据倾斜 我们需配置自己的分区策略!

nginx lua集成kafka的实现方法

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.cnblogs.com/-xiaoyu-/p/11294905.html