用Nifi 从web api 取数据到HDFS

时间:2023-03-09 16:43:00
用Nifi 从web api 取数据到HDFS

1. 全景图

用Nifi 从web api 取数据到HDFS

2. 用ExecuteScript生成动态日期参数

为了只生成一个flowfile:
用Nifi 从web api 取数据到HDFS
Groovy 代码:

import org.apache.commons.io.IOUtils
import java.nio.charset.*
import java.text.SimpleDateFormat;
import java.lang.StringBuilder;
import java.util.Calendar;

def flowFile = session.create()

def days = 10000

flowFile = session.write(flowFile, {inputStream, outputStream ->
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
Calendar cal = Calendar.getInstance();
StringBuilder sb = new StringBuilder();

cal.add(Calendar.DATE,1)

for(int i = 0; i < days; i++) {
  cal.add(Calendar.DATE, -1);
  sb.append(sdf.format(cal.getTime()) + "\n" );
 }

//println(sb);

outputStream.write(sb.toString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)

//flowFile = session.putAttribute(flowFile, 'filename', 'get_date')
session.transfer(flowFile, REL_SUCCESS)

 

3. 用SplitText生成每行一个的日期

Line Split Count    1

 

4. 用ExtractText 取到日期参数

用Nifi 从web api 取数据到HDFS

 

5. 用UpdateAttribute生成url及filename

用Nifi 从web api 取数据到HDFS

这里一定要设置filename,不然,所有的文件名都一样,最后只能成功插入一个记录到HDFS。

 

6.  用InvokeHttp获取数据

用Nifi 从web api 取数据到HDFS

用Nifi 从web api 取数据到HDFS

 

7. 添加一个 RouteOnContent来过滤空数据

用Nifi 从web api 取数据到HDFS

 

8. 用PutHDFS把数据插入到HDFS

用Nifi 从web api 取数据到HDFS

注意这里的Directory 要加上/, 不然就插入到user/root/nifi下了,而不是files下在的nifi了。

 

9. 每天更新数据

用Nifi 从web api 取数据到HDFS

每天20点更新数据

代码小改下:

def count = 1

 

 

NIFI 中国社区 QQ群:595034369