首页
编程随笔
Java笔记
Html/Css/Js
Android
后端笔记
服务器搭建
BUG收集
Java异常
Android异常
在线工具
Json格式化
编码/解码
Epub在线编辑
登录
发布文章
个人文章
退出登录
首页
技术教程
BUG收集
在线工具
资源下载
登录
发布文章
退出登录
搜索
当前位置:
首页
-
博客
- 正文
关闭
Flume安装并整合到Kafka
更新时间:2023-06-25 10:33:31
阅读数:705
发布者:落幕
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 本文flume版本为apache-flume-1.11.0 环境准备 域名 : ip 主机1 test1:192.168.225.128 主机2 test2:192.168.225.129 主机3 test3:192.168.225.130 ### 1、下载并安装 进入官网下载最新Flume版本即可 ```txt (1)Flume官网地址:http://flume.apache.org/ (2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html (3)下载地址:https://flume.apache.org/download.html ``` 解压 tar -xvzf apache-flume-1.11.0-bin.tar.gz ### 2、配置flume 进入Flume根目录,在job目录下创建file_to_kafka.conf vim jobs/file_to_kafka.conf 配置文件如下 ```config # 1 组件定义 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 2 配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 # 要监听日志文件,注意改成*.log有问题,待解决 a1.sources.r1.filegroups.f1 = /opt/logs/app.* # flume自动生成 a1.sources.r1.positionFile = /opt/flume/taildir_position.json # 3 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 4 配置sink # 发送目的地 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # kafka集群 a1.sinks.k1.kafka.bootstrap.servers = test1:9092,test2:9092,test3:9092 # 发送到kafka的topic a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # 5 拼接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` 主要需要更改的配置如下: ![flume安装](https://www.speechb.com/blog/flume_install.png "flume安装") ### 3、启动 bin/flume-ng agent -c conf/ -n a1 -f jobs/file_to_kafka.conf & ### 4、测试 (1)启动一个topic为first的kafka消费者 (2)编辑app.log文件,新增一行 vi /opt/logs/app.log (3) 启动的kafka消费者能消费到即安装配置成功