I am trying to use the MaxMind GeoIP API for scala-spark which is found https://github.com/snowplow/scala-maxmind-iplookups. I load in the file using standard:
我正在尝试使用MaxMind GeoIP API进行scala-spark,它位于https://github.com/snowplow/scala-maxmind-iplookups。我使用标准加载文件:
val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
I have a basic csv file which I load in that contains time and IP adresses:
我有一个基本的csv文件,我加载其中包含时间和IP地址:
val sweek1 = week1.map{line=> IP(parse(line))}.collect{
case Some(ip) => {
val ipadress = ipdetect(ip.ip)
(ip.time, ipadress)
}
}
The function ipdetect is basically defined by:
ipdetect函数基本上由以下定义:
def ipdetect(a:String)={
ipLookups.performLookups(a)._1 match{
case Some(value) => value.toString
case _ => "Unknown"
}
}
When I run this program, it prompt that "Task not serializable". So I read a few posts and there seem to be a few ways around this.
当我运行此程序时,它会提示“任务不可序列化”。所以我读了几篇文章,似乎有几种解决方法。
1, a wrapper 2, using SparkContext.addFile (which distribute file across cluster)
1,包装器2,使用SparkContext.addFile(跨集群分发文件)
but I cannot work out how either one of them works, I tried the wrapper, but I don't know how and where to call it. I tried addFile, but it returns a Unit instead of String, which I assume you will need to somehow pipe the Binary file. So I am not sure about what to do now. Any help is much appreciated
但我无法弄清楚其中任何一个是如何工作的,我尝试了包装,但我不知道如何以及在何处调用它。我尝试了addFile,但它返回一个Unit而不是String,我假设你需要以某种方式管道二进制文件。所以我不确定现在要做什么。任何帮助深表感谢
So I have been able to somewhat serialize it by using mapPartitions and iterate over each local partition, but I wonder if there is a more efficient way to do this as I have dataset in the range of millions
所以我已经能够通过使用mapPartitions并在每个本地分区上迭代来对它进行一些序列化,但是我想知道是否有更有效的方法来实现这一点,因为我拥有数百万的数据集
1 个解决方案
#1
Assume that your csv file contains an IP address per line, and for example, you want to map each ip address to a city.
假设您的csv文件每行包含一个IP地址,例如,您希望将每个IP地址映射到一个城市。
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
For other ip transformation, please refer to Scala MaxMind IP Lookups. Furthermore, mapWith
seems to be deprecated. Use mapPartitionsWithIndex
instead.
有关其他ip转换,请参阅Scala MaxMind IP查找。此外,mapWith似乎已被弃用。请改用mapPartitionsWithIndex。
#1
Assume that your csv file contains an IP address per line, and for example, you want to map each ip address to a city.
假设您的csv文件每行包含一个IP地址,例如,您希望将每个IP地址映射到一个城市。
import com.snowplowanalytics.maxmind.iplookups.IpLookups
val geoippath = "path/to/geoip.dat"
val sc = new SparkContext(new SparkConf().setAppName("IP Converter"))
sc.addFile(geoippath)
def parseIP(ip:String, ipLookups: IpLookups): String = {
val lookupResult = ipLookups.performLookups(ip)
val city = lookupResult._1.map(_.city).getOrElse(None).getOrElse("")
}
val logs = sc.textFile("path/to/your.csv")
.mapWith(_ => IpLookups(geoFile = Some(SparkFiles.get("geoip.dat"))))(parseIP)
For other ip transformation, please refer to Scala MaxMind IP Lookups. Furthermore, mapWith
seems to be deprecated. Use mapPartitionsWithIndex
instead.
有关其他ip转换,请参阅Scala MaxMind IP查找。此外,mapWith似乎已被弃用。请改用mapPartitionsWithIndex。