I want to create a server socket to listen on, on a host that I know the ip and hostname ahead of time (and it shows up with that hostname in the yarn node list) . But I can't seem to get it to listen on that host without letting it fail an arbitrary number of times before hand.
我想创建一个服务器套接字来监听,在我知道ip和主机名的主机上提前(并在纱线节点列表中显示该主机名)。但我似乎无法让它在那个主机上听,而不会让它在任何时候失败。
There's a Flume receiver that has the sort of host-specific functionality I'm looking for.
有一个Flume接收器,具有我正在寻找的主机特定功能。
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
My receiver code:
我的收件人代码:
class TCPServerReceiver(hostname: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
}
private def receive() {
/* This is where the job fails until it happens to start on the correct host */
val server = new ServerSocket(port, 50, InetAddress.getByName(hostname))
var userInput: String = null
while (true) {
try {
val s = server.accept()
val in = new BufferedReader(new InputStreamReader(s.getInputStream()))
userInput = in.readLine()
while (!isStopped && userInput != null) {
store(userInput)
userInput = in.readLine()
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + port, e)
case t: Throwable =>
restart("Error receiving data", t)
}
}
}
}
And then to test it while it's running:
然后在它运行时测试它:
echo 'this is a test' | nc <hostname> <port>
This all works when I run as a local client, but when it's submitted to a yarn cluster, the logs show it trying to run in other containers on different hosts and all of them fail because the hostname doesn't match that of the container:
当我作为本地客户端运行时,这一切都有效,但是当它被提交给一个纱线集群时,日志显示它试图在不同主机上的其他容器中运行,并且所有这些容器都因为主机名与容器的名称不匹配而失败:
java.net.BindException: Cannot assign requested address
Eventually (after several minutes), it does create the socket once the receiver tries to start on the correct host, so the above code does work, but it takes a substantial amount of "boot time" and I'm worried that adding more nodes will cause it to take even longer!
最终(几分钟后),一旦接收器尝试在正确的主机上启动它就会创建套接字,因此上面的代码确实有效,但是需要大量的“启动时间”,我担心添加更多节点将导致它需要更长的时间!
Is there a way of ensuring that this receiver starts on the correct host on the first try?
有没有办法在第一次尝试时确保此接收器在正确的主机上启动?
1 个解决方案
#1
The custom TCPServerReceiver
implementation should also implement:
自定义TCPServerReceiver实现还应该实现:
def preferredLocation: Option[String]
Override this to specify a preferred location (hostname).
重写此项以指定首选位置(主机名)。
In this case, something like:
在这种情况下,像:
def preferredLocation = Some(hostname)
#1
The custom TCPServerReceiver
implementation should also implement:
自定义TCPServerReceiver实现还应该实现:
def preferredLocation: Option[String]
Override this to specify a preferred location (hostname).
重写此项以指定首选位置(主机名)。
In this case, something like:
在这种情况下,像:
def preferredLocation = Some(hostname)