In Scala documentation, there is an example how to pick future which succeeds faster by using promises.
在Scala文档中,有一个示例如何通过使用promises来选择更快成功的未来。
http://docs.scala-lang.org/overviews/core/futures.html#promises
def first[T](f: Future[T], g: Future[T]): Future[T] = {
val p = promise[T]
f onSuccess {
case x => p.trySuccess(x)
}
g onSuccess {
case x => p.trySuccess(x)
}
p.future
}
This function returns the future which succeeds first and if either one of them fails, it never completes.
此函数返回首先成功的未来,如果其中任何一个失败,它将永远不会完成。
Is it possible to modify this in a way that even if other future fails, then the second is returned if it's successful and if both of them are successful, then the faster one is picked like the code does now.
是否有可能以这样的方式修改它,即使其他未来失败,如果成功并且如果它们都成功则返回第二个,那么选择更快的一个就像代码现在那样。
4 个解决方案
#1
You can add this:
你可以添加这个:
f onFailure {
case e =>
g onFailure {
case _ =>
p.failure(e)
}
}
When both futures are failed, this will fail the promise with the same exception as f
. You can elaborate on this to create an exception that records the 2 exceptions coming from f
and g
if necessary.
当两个期货都失败时,这将失败承诺与f相同的例外。您可以详细说明这一点,以创建一个例外,记录来自f和g的2个例外(如有必要)。
#2
I recommend you to follow the advice of Alvin Alexander for futures and promises in Scala here
我建议你在这里遵循Alvin Alexander的建议,以获得Scala的未来和承诺
I believe that this is the best way to work with futures
我相信这是与期货合作的最佳方式
package futures
import scala.concurrent.{Future => ConcurrentTask} // rename
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import Utils.sleep
object FutureAsConcurrentTask extends App {
// run some long-running task (task has type Future[Int] in this example)
val task = ConcurrentTask {
Cloud.executeLongRunningTask
}
// whenever the task completes, execute this code
task.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => println(s"D'oh! The task failed: ${e.getMessage}")
}
// do your other work
println("A ..."); sleep(100)
println("B ..."); sleep(100)
println("C ..."); sleep(100)
println("D ..."); sleep(100)
println("E ..."); sleep(100)
println("F ..."); sleep(100)
}
#3
Here is a basic pattern for picking the fastest future or timing out if they all are too slow:
这是一个基本模式,用于选择最快的未来或超时,如果它们都太慢:
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }
import akka.actor._
import akka.pattern.after
object GetFastestFutureOrTimeout extends App {
val e = new TimeoutException("TimeoutException")
val system = ActorSystem("GetFastestFutureOrTimeout")
val f1 = Future { Thread.sleep(200); "this is f1" }
val f2 = Future { Thread.sleep(100); "this is f2" }
val timeoutFuture = after(500.milliseconds, using = system.scheduler) { Future.failed(e) }
val f = Future.firstCompletedOf(f1 :: f2 :: timeoutFuture :: Nil)
f onComplete {
case Success(msg) => println(msg)
case Failure(err) => println("An error occured: " + err.getMessage)
}
}
This prints "this is f2". If the timeout of timeoutFuture were changed to 50, it would print "An error occured: TimeoutException".
这打印“这是f2”。如果timeoutFuture的超时更改为50,则会打印“发生错误:TimeoutException”。
Under the hood firstCompletedOf uses a Promise to return the value of the first Future that is completed, see https://github.com/scala/scala/blob/v2.11.6/src/library/scala/concurrent/Future.scala#L503.
在引擎盖下,firstCompletedOf使用Promise返回已完成的第一个Future的值,请参阅https://github.com/scala/scala/blob/v2.11.6/src/library/scala/concurrent/Future.scala# L503。
#4
This one is a basic implementation to pick up the fastest successful response or fail if they all failed:
这是一个基本的实现,以获取最快的成功响应或失败,如果它们都失败:
def getFirstSuccessfulResultOrFail[T](requests: List[Future[T]]): Future[T] = {
val p = Promise[T]()
val countDownLatch = AtomicInt(0)
requests.foreach { f =>
f.onComplete {
case Failure(e) => if (countDownLatch.addAndGet(1) == requests.size) p.tryFailure(e)
case Success(s) => p.trySuccess(s)
}
}
p.future
}
#1
You can add this:
你可以添加这个:
f onFailure {
case e =>
g onFailure {
case _ =>
p.failure(e)
}
}
When both futures are failed, this will fail the promise with the same exception as f
. You can elaborate on this to create an exception that records the 2 exceptions coming from f
and g
if necessary.
当两个期货都失败时,这将失败承诺与f相同的例外。您可以详细说明这一点,以创建一个例外,记录来自f和g的2个例外(如有必要)。
#2
I recommend you to follow the advice of Alvin Alexander for futures and promises in Scala here
我建议你在这里遵循Alvin Alexander的建议,以获得Scala的未来和承诺
I believe that this is the best way to work with futures
我相信这是与期货合作的最佳方式
package futures
import scala.concurrent.{Future => ConcurrentTask} // rename
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import Utils.sleep
object FutureAsConcurrentTask extends App {
// run some long-running task (task has type Future[Int] in this example)
val task = ConcurrentTask {
Cloud.executeLongRunningTask
}
// whenever the task completes, execute this code
task.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => println(s"D'oh! The task failed: ${e.getMessage}")
}
// do your other work
println("A ..."); sleep(100)
println("B ..."); sleep(100)
println("C ..."); sleep(100)
println("D ..."); sleep(100)
println("E ..."); sleep(100)
println("F ..."); sleep(100)
}
#3
Here is a basic pattern for picking the fastest future or timing out if they all are too slow:
这是一个基本模式,用于选择最快的未来或超时,如果它们都太慢:
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{ Success, Failure }
import akka.actor._
import akka.pattern.after
object GetFastestFutureOrTimeout extends App {
val e = new TimeoutException("TimeoutException")
val system = ActorSystem("GetFastestFutureOrTimeout")
val f1 = Future { Thread.sleep(200); "this is f1" }
val f2 = Future { Thread.sleep(100); "this is f2" }
val timeoutFuture = after(500.milliseconds, using = system.scheduler) { Future.failed(e) }
val f = Future.firstCompletedOf(f1 :: f2 :: timeoutFuture :: Nil)
f onComplete {
case Success(msg) => println(msg)
case Failure(err) => println("An error occured: " + err.getMessage)
}
}
This prints "this is f2". If the timeout of timeoutFuture were changed to 50, it would print "An error occured: TimeoutException".
这打印“这是f2”。如果timeoutFuture的超时更改为50,则会打印“发生错误:TimeoutException”。
Under the hood firstCompletedOf uses a Promise to return the value of the first Future that is completed, see https://github.com/scala/scala/blob/v2.11.6/src/library/scala/concurrent/Future.scala#L503.
在引擎盖下,firstCompletedOf使用Promise返回已完成的第一个Future的值,请参阅https://github.com/scala/scala/blob/v2.11.6/src/library/scala/concurrent/Future.scala# L503。
#4
This one is a basic implementation to pick up the fastest successful response or fail if they all failed:
这是一个基本的实现,以获取最快的成功响应或失败,如果它们都失败:
def getFirstSuccessfulResultOrFail[T](requests: List[Future[T]]): Future[T] = {
val p = Promise[T]()
val countDownLatch = AtomicInt(0)
requests.foreach { f =>
f.onComplete {
case Failure(e) => if (countDownLatch.addAndGet(1) == requests.size) p.tryFailure(e)
case Success(s) => p.trySuccess(s)
}
}
p.future
}