nginx+ flume

时间:2022-05-20 08:11:18

nginx 作用: 做负载均衡  
nginx和lvs的区别:nginx可以做反向代理
1、上传nginx安装包  tar -zxvf tengine-2.1.0
2、安装环境  
依赖 gcc openssl-devel pcre-devel zlib-devel
安装:yum install gcc openssl-devel pcre-devel zlib-devel -y
3、 安装Nginx
./configure
make && make install
4、在/etc/rc.d/init.d 目录下注册nginx文件

vi nginx
#!/bin/sh
#
# nginx - this script starts and stops the nginx daemon
#
# chkconfig: - 85 15
# description: Nginx is an HTTP(S) server, HTTP(S) reverse \
# proxy and IMAP/POP3 proxy server
# processname: nginx
# config: /etc/nginx/nginx.conf
# config: /etc/sysconfig/nginx
# pidfile: /var/run/nginx.pid # Source function library.
. /etc/rc.d/init.d/functions # Source networking configuration.
. /etc/sysconfig/network # Check that networking is up.
[ "$NETWORKING" = "no" ] && exit 0 nginx="/usr/local/nginx/sbin/nginx"
prog=$(basename $nginx) NGINX_CONF_FILE="/usr/local/nginx/conf/nginx.conf" [ -f /etc/sysconfig/nginx ] && . /etc/sysconfig/nginx lockfile=/var/lock/subsys/nginx make_dirs() {
# make required directories
user=`nginx -V 2>&1 | grep "configure arguments:" | sed 's/[^*]*--user=\([^ ]*\).*/\1/g' -`
options=`$nginx -V 2>&1 | grep 'configure arguments:'`
for opt in $options; do
if [ `echo $opt | grep '.*-temp-path'` ]; then
value=`echo $opt | cut -d "=" -f 2`
if [ ! -d "$value" ]; then
# echo "creating" $value
mkdir -p $value && chown -R $user $value
fi
fi
done
} start() {
[ -x $nginx ] || exit 5
[ -f $NGINX_CONF_FILE ] || exit 6
make_dirs
echo -n $"Starting $prog: "
daemon $nginx -c $NGINX_CONF_FILE
retval=$?
echo
[ $retval -eq 0 ] && touch $lockfile
return $retval
} stop() {
echo -n $"Stopping $prog: "
killproc $prog -QUIT
retval=$?
echo
[ $retval -eq 0 ] && rm -f $lockfile
return $retval
} restart() {
configtest || return $?
stop
sleep 1
start
} reload() {
configtest || return $?
echo -n $"Reloading $prog: "
killproc $nginx -HUP
RETVAL=$?
echo
} force_reload() {
restart
} configtest() {
$nginx -t -c $NGINX_CONF_FILE
} rh_status() {
status $prog
} rh_status_q() {
rh_status >/dev/null 2>&1
} case "$1" in
start)
rh_status_q && exit 0
$1
;;
stop)
rh_status_q || exit 0
$1
;;
restart|configtest)
$1
;;
reload)
rh_status_q || exit 7
$1
;;
force-reload)
force_reload
;;
status)
rh_status
;;
condrestart|try-restart)
rh_status_q || exit 0
;;
*)
echo $"Usage: $0 {start|stop|status|restart|condrestart|try-restart|reload|force-reload|configtest}"
exit 2
esac

5、给该文件一个执行权限  chmod +x nginx
6、添加该文件到系统服务中去
    chkconfig --add nginx
    查看是否添加成功
    chkconfig --list nginx
7、nginx启动  service nginx start  
8、启动之后用浏览器访问   查看是否启动成功  node2:
9、修改/usr/local/nginx/conf/nginx.conf
a) 、 修改格式化方式  
  log_format my_format '$remote_addr^A$msec^A$http_host^A$request_uri';
 
 location =/log.gif {
        default_type image/gif;
        access_log /opt/data/access.log my_format;
10、修改之后需要重新登录  service nginx reload

flume 知识点总结  
一、 安装flume  
1、上传flume安装包  
      解压安装包: tar -zxvf apache-flume-1.6.0
2、修改解压包的名称 : mv apache-flume-1.6.0-bin  flume
3、修改配置文件夹的名称: mv flume-env.sh.template flume-env.sh
4、在配置文件夹 flume-env.sh 文件夹中配置flume java 环境变量  
(注:在冒号模式下寻找java配置文件的位置: /JAVA,寻找环境变量中对java环境变量的配置位置 echo $JAVA_HOME)
5、在环境变量配置文件夹中配置java环境变量  
vi /etc/profile   添加FLUME_HOME 的配置  
FLUME_HOME= /root/flume
将配置文件添加到path路径下面   . /etc/profile
6、配置完成之后查看文件配置是否成功  
flume-ng version  如果能够显示flume的版本 说明文件配置成功了
7、添加自定义配置文件

Source、Channel、Sink有哪些类型
    Flume Source
    Source类型                   | 说明
    Avro Source                 | 支持Avro协议(实际上是Avro RPC),内置支持
    Thrift Source               | 支持Thrift协议,内置支持
    Exec Source                 | 基于Unix的command在标准输出上生产数据
    JMS Source                   | 从JMS系统(消息、主题)中读取数据
    Spooling Directory Source | 监控指定目录内数据变更
    Twitter 1% firehose Source|    通过API持续下载Twitter数据,试验性质
    Netcat Source               | 监控某个端口,将流经端口的每一个文本行数据作为Event输入
    Sequence Generator Source | 序列生成器数据源,生产序列数据
    Syslog Sources               | 读取syslog数据,产生Event,支持UDP和TCP两种协议
    HTTP Source                 | 基于HTTP POST或GET方式的数据源,支持JSON、BLOB表示形式
    Legacy Sources               | 兼容老的Flume OG中Source(0.9.x版本)

Flume Channel
    Channel类型       说明
    Memory Channel                | Event数据存储在内存中
    JDBC Channel                  | Event数据存储在持久化存储中,当前Flume Channel内置支持Derby
    File Channel                  | Event数据存储在磁盘文件中
    Spillable Memory Channel   | Event数据存储在内存中和磁盘上,当内存队列满了,会持久化到磁盘文件
    Pseudo Transaction Channel | 测试用途
    Custom Channel                | 自定义Channel实现

Flume Sink
    Sink类型     说明
    HDFS Sink             | 数据写入HDFS
    Logger Sink           | 数据写入日志文件
    Avro Sink             | 数据被转换成Avro Event,然后发送到配置的RPC端口上
    Thrift Sink           | 数据被转换成Thrift Event,然后发送到配置的RPC端口上
    IRC Sink              | 数据在IRC上进行回放
    File Roll Sink         | 存储数据到本地文件系统
    Null Sink             | 丢弃到所有数据
    HBase Sink             | 数据写入HBase数据库
    Morphline Solr Sink | 数据发送到Solr搜索服务器(集群)
    ElasticSearch Sink     | 数据发送到Elastic Search搜索服务器(集群)
    Kite Dataset Sink     | 写数据到Kite Dataset,试验性质的
    Custom Sink           | 自定义Sink实现

案例1、 A simple example
    http://flume.apache.org/FlumeUserGuide.html#a-simple-example

配置文件
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node2
a1.sources.r1.port = 44444 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

启动flume
flume-ng agent -n a1 -c conf -f option -Dflume.root.logger=INFO,console
注意:启动命令启动的位置  一定要在option文件所在的文件夹下

安装telnet
yum install telnet
退出 ctrl+]  quit

Memory Chanel 配置
  capacity:默认该通道中最大的可以存储的event数量是100,
  trasactionCapacity:每次最大可以source中拿到或者送到sink中的event数量也是100
  keep-alive:event添加到通道中或者移出的允许时间
  byte**:即event的字节量的限制,只包括eventbody

-----配置多节点的flume  
1、将node2配置的配置文件发送到node3
scp -r flume/ root@node3:/root/
2、配置node3节点的环境变量
vi /etc/profile

案例2、两个flume做集群

#node2
############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node2
a1.sources.r1.port = 44444 # Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 60000 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

node02服务器中,安装Flume(步骤略)
    配置文件

############################################################
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node3
a1.sources.r1.port = 60000 # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

先启动node02的Flume
    flume-ng agent  -n a1 -c conf -f avro.conf -Dflume.root.logger=INFO,console

flume-ng agent --conf -file option2 --name a1  -Dflume.root.logger=INFO,console
    
    再启动node01的Flume
    flume-ng agent  -n a1 -c conf -f simple.conf2 -Dflume.root.logger=INFO,console
    
    打开telnet 测试  node02控制台输出结果

node3显示这些的时候证明连接成功了

注意事项: 在配置的时候,需要注意节点之间的名称,同时需要注意启动顺序  先启动客户端node3 在启动服务端 node2

------将多个flume上的日志内容收集到一个服务器上    解决单点故障问题  
flume可以进行断点续传

--案例三:execu source --执行源 通过一个unix命令监控数据源

Exec Source
http://flume.apache.org/FlumeUserGuide.html#exec-source 配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/dirflume/log.txt # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################ 启动Flume
flume-ng agent -n a1 -c conf -f exec.conf -Dflume.root.logger=INFO,console 创建空文件演示 touch flume.exec.log
循环添加数据
for i in {1..50}; do echo "$i hi flume" >> flume.exec.log ; sleep 0.1; done

--案例四 :  读取具有指定格式的文件夹

Spooling Directory Source
http://flume.apache.org/FlumeUserGuide.html#spooling-directory-source
配置文件
############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/flume/log/
a1.sources.r1.fileHeader = false # Describe the sink
a1.sinks.k1.type = logger # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

启动Flume
    flume-ng agent -n a1 -c conf -f spool.conf -Dflume.root.logger=INFO,console

拷贝文件演示
    mkdir logs
    cp flume.exec.log logs/

原始目录中有文件会进行读取,原始目录中没有文件,添加文件后也会进行读取  
  添加后缀名之后再进行校验:  a1.sources.r1.fileSuffix=.wcg

--注:断电续传的功能是需要去进行配置的  
---案例五:将flume中的数据导入到hdfs中  
hdfs sink
        http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
    
        配置文件

############################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/logs
a1.sources.r1.fileHeader = true # Describe the sink
***只修改上一个spool sink的配置代码块 a1.sinks.k1.type = logger
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://bjsxt/flume/%Y-%m-%d/%H%M ##每隔60s或者文件大小超过10M的时候产生新文件
# hdfs有多少条消息时新建文件,0不基于消息个数
a1.sinks.k1.hdfs.rollCount=0
# hdfs创建多长时间新建文件,0不基于时间
a1.sinks.k1.hdfs.rollInterval=60
# hdfs多大时新建文件,0不基于文件大小
a1.sinks.k1.hdfs.rollSize=10240
# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
a1.sinks.k1.hdfs.idleTimeout=3 a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.useLocalTimeStamp=true ## 每五分钟生成一个目录:
# 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式
a1.sinks.k1.hdfs.round=true
# 时间上进行“舍弃”的值;
a1.sinks.k1.hdfs.roundValue=5
# 时间上进行”舍弃”的单位,包含:second,minute,hour
a1.sinks.k1.hdfs.roundUnit=minute # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
############################################################

注:flume是通过hdfs的环境变量,默认找到hdfs的配置位置

----用flume获取nginx的日志,并上传到hdfs

# project

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1 # Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log # Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /log/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.rollInterval=0
a1.sinks.k1.hdfs.rollSize=102400
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=10
a1.sinks.k1.hdfs.callTimeOut=40000
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.fileType=DataStream # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1