spark学习-33-Spark安全机制SecurityManager

时间:2021-09-27 18:28:20

1。官网请查看:http://spark.apache.org/docs/latest/security.html,本文是翻译加工
 现在,Spark支持通过共享秘钥进行认证。启用认证功能可以通过参数spark.authenticate来配置。此参数控制spark通信协议是否使用共享秘钥进行认证。这种认证方式基于握手机制,以确保通信双方都有相同的共享秘钥时才能通信。如果共享秘钥不一致,则双方将无法通信。可以通过以下过程来创建共享秘钥:

  1.在spark on YARN部署模式下,配置spark.authenticate为true,就可以自动产生并分发共享秘钥。每个应用程序都使用唯一的共享秘钥。
  2.其他部署方式下,应当在每个节点上都配置参数spark.authenticate.secret。此秘钥将由所有Master、worker及应用程序来使用。
  3.注意:实验性的Netty shuffle路径(spark.shuffle.use.netty)是不安全的,因此,如果启用认证功能就不要使用Netty for shuffle了。

Web UI

   通过设置参数spark.ui.filters来使用javax servlet filters就可以实现安全的spark UI。如果用户有某些不该让其他人看到的数据,那么该用户就会要求UI也能有安全性。用户指定了Java servlet filter就可以进行认证了。接下来,一旦用户登录,spark就可以在ACL列表中查询该用户是否有权查看UI。配置参数spark.acls.enable和spark.ui.view.acls控制着ACL的行为。注意:启动应用程序的用户总是有权访问UI。在YARN上,spark UI使用标准的YARN web代理机制并通过已安装的Hadoop filters进行认证。

  spark还支持通过修改ACL表来控制哪个用户有权修改正在运行着的spark应用程序。比如,kill一个应用程序或任务。这类操作是通过配置spark.acls.enable和spark.modify.acls来控制的。注意:如果你正在认证Web UI,那么,为了使用Web UI上的kill按钮,你就必须把用户添加到view acls表中。在YARN上,修改后的acls被传入并控制着哪个用户可以通过YARN接口来访问web UI。

   如果有多个管理员存在,那么spark允许在acls中指定多个管理员,让他们总可以查看所有应用程序,以及修改应用的权限。这一功能是由配置参数spark.admin.acls来控制的。这在共享集群上是很有用的,因为这类集群中往往有多个管理员或者帮助用户调试程序的技术支持人员。

事件日志

  如果应用程序正在使用事件日志记录功能,那么,应当手动创建用来存放事件日志的路径(spark.eventLog.dir),并赋予其合适的权限。如果你想让这些日志文件也是安全的,那么,该路径的权限应当设为drwxrwxrwxt。该路径的所有者应该是正在运行history server进程的那个超级用户,而且用户组权限应限制为超级用户组。这样做可以让所有用户都能对该路径执行写操作,但会阻止那些未经授权的用户删除或重命名文件,除非他们是该文件或者路径的所有者。事件日志文件由spark创建并赋予权限,比如只有所有者及其所在用户组有读写权限。

网络安全(配置端口)

  spark大量使用网络,而且有些环境严格要求使用严密的防火墙设置。下面是spark用于通信的主要端口,以及如何配置这些端口。

仅适用于Standalone模式的端口
spark学习-33-Spark安全机制SecurityManager

适用于所有集群管理器的通用端口
spark学习-33-Spark安全机制SecurityManager

下面来看看源代码解释

1。SecurityManager的初始化是在创建SparkEnv的时候。

 // ===================== 1.创建安全管理器SecurityManager ======================
// 安全管理器是什么呢?请看http://blog.csdn.net/qq_21383435/article/details/78560364
val securityManager = new SecurityManager(conf, ioEncryptionKey) // TODO:调试注释 securityManager:SecurityManager@2227 conf:SparkConf@2141 ioEncyptionKey:"none"
ioEncryptionKey.foreach { _ =>
// 检查是否应启用网络加密。
if (!securityManager.isEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
}

看看new都做了什么

 // allow all users/groups to have view/modify permissions
// 允许用户/组拥有查看/修改的权限
private val WILDCARD_ACL = "*" // ACL权限通配符(wildcard的意思通配符)

// 这里配置的是这个属性spark.authenticate,默认为false
private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))

// admin acls should be set before view or modify acls
// Admin ACL前应设置查看或修改ACL
private var adminAcls: Set[String] =
stringToSet(sparkConf.get("spark.admin.acls", ""))

// admin group acls should be set before view or modify group acls
// DMIN组ACL前应设置查看或修改组的ACL
private var adminAclsGroups : Set[String] =
stringToSet(sparkConf.get("spark.admin.acls.groups", ""))

private var viewAcls: Set[String] = _

private var viewAclsGroups: Set[String] = _

// list of users who have permission to modify the application. This should
// apply to both UI and CLI for things like killing the application.
// 具有修改应用程序权限的用户列表。这应该适用于UI和CLI等用于杀死应用程序的东西。
private var modifyAcls: Set[String] = _

private var modifyAclsGroups: Set[String] = _

// always add the current user and SPARK_USER to the viewAcls
// 随时添加当前用户和spark_user的viewacls
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Utils.getCurrentUserName()) // TODO:调试注释 defaultAclUsers:"Set$Set2" size =2 ,这里获取了 0=“hzjs” 1="root"

这里有两个关键的地方

// 这里配置的是这个属性spark.authenticate,默认为false
private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)

这个决定是否开启SecurityManager

/ always add the current user and SPARK_USER to the viewAcls
// 随时添加当前用户和spark_user的viewacls
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Utils.getCurrentUserName()) // TODO:调试注释 defaultAclUsers:"Set$Set2" size =2 ,这里获取了 0=“hzjs” 1="root"

这里调试的时候,获取了0=“hzjs” 1=”root”,hzjs是我的windows用户名,(这里应该是把hzjs用户赋予root权限去运行spark程序)

紧接着

 setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));

这几句话大致打印了以下内容

17/12/21 15:55:49 INFO SecurityManager: Changing view acls to: hzjs,root
17/12/21 15:55:49 INFO SecurityManager: Changing modify acls to: hzjs,root
17/12/21 15:55:49 INFO SecurityManager: Changing view acls groups to:
17/12/21 15:55:49 INFO SecurityManager: Changing modify acls groups to:
17/12/21 15:55:49 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: Set(hzjs, root); groups with view permissions: Set(); users with modify permissions: Set(hzjs, root); groups with modify permissions: Set()

然后是secretKey,大致和ssh免登陆的秘钥效果

 // 想生成secretKey必须spark.authenticate=true,而且要在yarn模式下运行
private val secretKey = generateSecretKey()
/**
* Generates or looks up the secret key.
*
* 生成或查找密钥。
*
* The way the key is stored depends on the Spark deployment mode. Yarn
* uses the Hadoop UGI.
*
* 密钥存储方式取决于Spark部署模式。Yarn采用Hadoop UGI。
*
* For non-Yarn deployments, If the config variable is not set
* we throw an exception.
*
* 对于non-Yarn部署模式,如果配置变量没有设置,我们将抛出一个异常。
*/

private def generateSecretKey(): String = {
if (!isAuthenticationEnabled) {
null
} else if (SparkHadoopUtil.get.isYarnMode) {
// In YARN mode, the secure cookie will be created by the driver and stashed in the
// user's credentials, where executors can get it. The check for an array of size 0
// is because of the test code in YarnSparkHadoopUtilSuite.
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY) //TODO: secretKey:null
if (secretKey == null || secretKey.length == 0) {
logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE //TODO: length:32
val secret = new Array[Byte](length)
rnd.nextBytes(secret)

val cookie = HashCodes.fromBytes(secret).toString() //TODO: cookie:"wery7237rwuhr732582irwberyu238grfuyewgr78....."
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
cookie
} else {
new Text(secretKey).toString
}
} else {
// user must have set spark.authenticate.secret config
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
case Some(value) => value
case None =>
throw new IllegalArgumentException(
"Error: a secret key must be specified via the " +
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
}
}
}

想要看这个生成了什么,我们从这段代码可以看到仅仅是设置spark.authenticate=true是不能的,还要是yarn模式下运行,这里我们需要看这个
Spark:Yarn-cluster和Yarn-client区别与联系
http://blog.csdn.net/qq_21383435/article/details/78864753

 System.setProperty("hadoop.home.dir", "F:\\02-hadoop\\hadoop-2.7.3\\");
System.setProperty("HADOOP_USER_NAME", "root");
System.setProperty("yarn.resourcemanager.hostname", "192.168.10.83");


var _sparkSession:SparkSession = SparkSession
.builder()
.appName("spark_scala_HBaseTest_lcc_yarn_client")
.master("yarn-client")
.config("spark.yarn.dist.files", "F:\\002-spark\\00-IDEAWork\\SparkOnHbaseScala\\config\\hadoop\\yarn-site.xml")
.config("spark.yarn.jars", "hdfs://192.168.10.82:8020/spark/jars/jars/")
.config("spark.authenticate", true) //启动安全管理
.getOrCreate()

最后generateSecretKey()方法生成了一个32位的字符串

最后是验证口令

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
// 使用HTTP连接设置口令认证
// 设定自己的验证器妥善协商HTTP连接/密码的用户。这是从HTTP服务器获取HTTP客户端的需要。把它放在这里,它只被设置一次。
// 注意这一段话,必须设置spark.authenticate为true,但是设置了这个,如果不是yarn模式运行会报错
if (authOn) {
Authenticator.setDefault(
new Authenticator() {
override def getPasswordAuthentication(): PasswordAuthentication = {
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL().getUserInfo()
if (userInfo != null) {
val parts = userInfo.split(":", 2)
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
}
return passAuth
}
}
)
}