flume 自定义 hbase sink

2024-12-16 21:52:43
推荐回答(1个)
回答1:

业务需求 flume需要从kafka获取数据并写入hbase
开始写的想法:按照flume的流程:一个source ,三个channel, 三个sink,因为我需要三个列族,如果使用官方的hbase sink那么需要三个sink。而且需要自定义一个source的拦截器,根据kafka获取的数据匹配不不同的channel,三个channel对应三个列族,然后配置到sink,就可以使用官方hbase的sink插入数据了。
实现:
1. 自定义一个拦截器

自定义拦截器

将自定义拦截器打成jar包,放到flume的lib目录,有依赖的包也需要将jar包一并放入,不然会报找不到包异常
conf/flume-diysource.conf 配置信息

一切准备就绪flume启动命令

控制台打印信息没报什么错误
查看hbase,hbase的列族是分对了,但是他把整一个kafka读取的数据当做一个value写入一列,而且列名是默认的,并不是我想要的。
于是..............diy开始了
当我在看flume的时候看到关于kafka channel是 这样写的

思路:根据 3 我们可以不需要再写一个source,直接channel 到 sink 一撸到底,只需要在sink上进行habse的相关操作
直接自定义 sink
依赖信息

自定义类 MyHbaseSink

自定义完毕,开始配置文件 ,这个配置就比较简单 conf/flume-diysource.conf 文件

一切准备好了,将自定义的sink打成jar包,放到flume,直接运行

后台运行

完美运行,habse在哗啦啦的写入!

总结: 业务需求是将kafka的数据写入到hbase,开始是想用官方的sink,结果是我太天真了,官方的hbase sink的rowKey并不满足业务需求,而且kafka的数据字段是不确定的,搞了半天,白忙活。发现自己定义的比较符合业务需求。
但是,自定义的sink也是比较坑的,开始自定sink,我把处理event的逻辑全都放在process() 方法里,结果很抽风,根本就没有执行到我的逻辑里,然后我在官网process()方法的上看到这么一句话: Send the Event to the external repository.

大概意思是让我把event发送到外部库,于是我把event处理独立出process()。结果amazing,成功写入hbase了,果然运气不错!