如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
import java.text.DecimalFormat
import com.alibaba.fastjson.JSON
import com.donews.data.AppConfig
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
/ * *
* Created by silentwolf on 2016 / 6 / 3.
* /
case class UserTag(SUUID: String,
MAN: Float ,
WOMAN: Float ,
AGE10_19: Float ,
AGE20_29: Float ,
AGE30_39: Float ,
AGE40_49: Float ,
AGE50_59: Float ,
GAME: Float ,
MOVIE: Float ,
MUSIC: Float ,
ART: Float ,
POLITICS_NEWS: Float ,
FINANCIAL: Float ,
EDUCATION_TRAINING: Float ,
HEALTH_CARE: Float ,
TRAVEL: Float ,
AUTOMOBILE: Float ,
HOUSE_PROPERTY: Float ,
CLOTHING_ACCESSORIES: Float ,
BEAUTY: Float ,
IT: Float ,
BABY_PRODUCT: Float ,
FOOD_SERVICE: Float ,
HOME_FURNISHING: Float ,
SPORTS: Float ,
OUTDOOR_ACTIVITIES: Float ,
MEDICINE: Float
)
object UserTagTable {
val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)
val REP_HOME = s "${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"
def main(args: Array[String]) {
var startTime = System.currentTimeMillis()
val conf: com.typesafe.config.Config = ConfigFactory.load()
val sc = new SparkContext()
val sqlContext = new SQLContext(sc)
var df1: DataFrame = null
if (args.length = = 0 ) {
println( "请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11" )
}
else {
var appkey = args( 0 )
var lastdate = args( 1 )
df1 = loadDataFrame(sqlContext, appkey, "2016-04-10" , lastdate)
df1.registerTempTable( "suuidTable" )
sqlContext.udf.register( "taginfo" , (a: String) = > userTagInfo(a))
sqlContext.udf.register( "intToString" , (b: Long ) = > intToString(b))
import sqlContext.implicits._
/ / * * * 重点 * * * :将临时表中的suuid和自定函数中Json数据,放入UserTag中。
sqlContext.sql( " select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid" ). map { case Row(suuid: String, taginfo: String) = >
val taginfoObj = JSON.parseObject(taginfo)
UserTag(suuid.toString,
taginfoObj.getFloat( "man" ),
taginfoObj.getFloat( "woman" ),
taginfoObj.getFloat( "age10_19" ),
taginfoObj.getFloat( "age20_29" ),
taginfoObj.getFloat( "age30_39" ),
taginfoObj.getFloat( "age40_49" ),
taginfoObj.getFloat( "age50_59" ),
taginfoObj.getFloat( "game" ),
taginfoObj.getFloat( "movie" ),
taginfoObj.getFloat( "music" ),
taginfoObj.getFloat( "art" ),
taginfoObj.getFloat( "politics_news" ),
taginfoObj.getFloat( "financial" ),
taginfoObj.getFloat( "education_training" ),
taginfoObj.getFloat( "health_care" ),
taginfoObj.getFloat( "travel" ),
taginfoObj.getFloat( "automobile" ),
taginfoObj.getFloat( "house_property" ),
taginfoObj.getFloat( "clothing_accessories" ),
taginfoObj.getFloat( "beauty" ),
taginfoObj.getFloat( "IT" ),
taginfoObj.getFloat( "baby_Product" ),
taginfoObj.getFloat( "food_service" ),
taginfoObj.getFloat( "home_furnishing" ),
taginfoObj.getFloat( "sports" ),
taginfoObj.getFloat( "outdoor_activities" ),
taginfoObj.getFloat( "medicine" )
)}.toDF().registerTempTable( "resultTable" )
val resultDF = sqlContext.sql(s "select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +
"AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +
"HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +
"MEDICINE from resultTable WHERE SUUID IS NOT NULL" )
resultDF.write.mode(SaveMode.Overwrite).options(
Map ( "table" - > "USER_TAGS" , "zkUrl" - > conf.getString( "Hbase.url" ))
). format ( "org.apache.phoenix.spark" ).save()
}
}
def intToString(suuid: Long ): String = {
suuid.toString()
}
def userTagInfo(num1: String): String = {
var de = new DecimalFormat( "0.00" )
var mannum = de. format (math.random).toFloat
var man = mannum
var woman = de. format ( 1 - mannum).toFloat
var age10_19num = de. format (math.random * 0.2 ).toFloat
var age20_29num = de. format (math.random * 0.2 ).toFloat
var age30_39num = de. format (math.random * 0.2 ).toFloat
var age40_49num = de. format (math.random * 0.2 ).toFloat
var age10_19 = age10_19num
var age20_29 = age20_29num
var age30_39 = age30_39num
var age40_49 = age40_49num
var age50_59 = de. format ( 1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat
var game = de. format (math.random * 1 ).toFloat
var movie = de. format (math.random * 1 ).toFloat
var music = de. format (math.random * 1 ).toFloat
var art = de. format (math.random * 1 ).toFloat
var politics_news = de. format (math.random * 1 ).toFloat
var financial = de. format (math.random * 1 ).toFloat
var education_training = de. format (math.random * 1 ).toFloat
var health_care = de. format (math.random * 1 ).toFloat
var travel = de. format (math.random * 1 ).toFloat
var automobile = de. format (math.random * 1 ).toFloat
var house_property = de. format (math.random * 1 ).toFloat
var clothing_accessories = de. format (math.random * 1 ).toFloat
var beauty = de. format (math.random * 1 ).toFloat
var IT = de. format (math.random * 1 ).toFloat
var baby_Product = de. format (math.random * 1 ).toFloat
var food_service = de. format (math.random * 1 ).toFloat
var home_furnishing = de. format (math.random * 1 ).toFloat
var sports = de. format (math.random * 1 ).toFloat
var outdoor_activities = de. format (math.random * 1 ).toFloat
var medicine = de. format (math.random * 1 ).toFloat
"{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +
"\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +
"\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +
"\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +
"\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +
"\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +
"\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +
"}" ;
}
def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = {
val path = s "$REP_HOME/appstatistic"
ctx.read.parquet(path)
. filter (s "timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'" )
}
}
|
以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/silentwolfyh/article/details/51966952