Spout的实现步骤:
· 对文件的改变进行分开的监听,并监视文件夹下有无新日志文件加入。
· 在数据得到了字段的说明后,将其转换成tuple。
· 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。
Spout的详细编码在Listing Three中显示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。
1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )
2. {
3. _collector = collector;
4. try
5. {
6. fileReader = new BufferedReader(new FileReader(new File(file)));
7. }
8. catch (FileNotFoundException e)
9. {
10. System.exit(1);
11. }
12. }
13.
14. public void nextTuple()
15. {
16. protected void ListenFile(File file)
17. {
18. Utils.sleep(2000);
19. RandomAccessFile access = null;
20. String line = null;
21. try
22. {
23. while ((line = access.readLine()) != null)
24. {
25. if (line !=null)
26. {
27. String[] fields=null;
28. if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter());
29. else
30. fields = line.split (tupleInfo.getDelimiter());
31. if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields));
32. }
33. }
34. }
35. catch (IOException ex){ }
36. }
37. }
38.
39. public void declareOutputFields(OutputFieldsDeclarer declarer)
40. {
41. String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
42. for(int i=0; i<tupleInfo.getFieldList().size(); i++)
43. {
44. fieldsArr = tupleInfo.getFieldList().get(i).getColumnName();
45. }
46. declarer.declare(new Fields(fieldsArr));
47. }
declareOutputFileds()决定了tuple发射的格式,这种话Bolt就能够用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有加入Spout就会进行读入而且发送给Bolt进行处理。
很多其它精彩内容请关注:http://bbs.superwu.cn
关注超人学院微信二维码:
|
相关文章
- 1、win10下连接本地系统上的Linux操作系统(分别以Nat方式和桥接模式实现)
- 使用jQuery实现简单的拖动效果
- js+css实现骰子的随机转动
- pygame 精灵的行走及二段跳实现方法
- 【MySQL笔记】Excel数据导入Mysql数据库的实现方法——Navicat
- SQL WHILE 循环中的游标 用例,SQL中实现循环操作
- c#中@标志的作用 C#通过序列化实现深表复制 细说并发编程-TPL 大数据量下DataTable To List效率对比 【转载】C#工具类:实现文件操作File的工具类 异步多线程 Async .net 多线程 Thread ThreadPool Task .Net 反射学习
- java中多态的实现机制
- [知了堂学习笔记]_Java代码实现MySQL数据库的备份与还原
- Openblas编译Android NDK库的步骤