Scala+Future实现异步编程

如题所述

第1个回答  2024-09-05

多核处理器以及并行任务的逐渐普及,人们对异步编程也越来越关注。Scala标准库中提供的Future允许你在得到真正的执行结果之前,就允许通过map,filter等集合操作得到下一个变换之后的异步结果。

我们无需再以阻塞的方式等待每一步结果,而是使用Future快速构造出一个异步的,对一系列不可变的结果的操作流水线出来。

早在笔者之前介绍的Akka,Play等实战Demo中,我们就已经接触过Future了。在本专题中,将详细地介绍如何正确地使用它。

坎坷的线程同步控制

Java为每一个对象关联了逻辑监视器(monitor),用来控制对数据的多线程访问。通过这种模型,我们来决定哪些数据可以被多线程共享,并使用synchronized关键字“加锁”。

要用锁模型来创建一个健壮的多线程应用实际上是一件非常困难的事情。对于每个需要被共享的数据,我们都要为它上锁,并且确保不会引发死锁问题。然而,即便我们主动对数据上锁,它们也不是在编译期间就固定的,在运行期间,程序仍然可以任意地创建新的锁。

后来,Java提供了java.util.concurrent包来提供更高级别的抽象同步,至少要比自己手动通过各种同步语法来实现不稳定的同步机制要来得快。然而,这类工具包仍然是基于共享数据和锁的,因此在本质上没有解决该类模型的种种困难。

在Scala程序中创建第一个Future

ScalaFuture在相当程度上减少了程序员对共享数据进行和锁处理的负担。如果某个函数的执行结果返回的是一个Future,则它意味着将返回另一个要被异步执行的计算,而至于哪个线程来处理这个稍后的异步计算,将由Scala提供的执行上下文(ExecutionContext)来决定。

因此,在使用Future实现异步编程之前,首先需要将执行上下文导入进来:

importscala.concurrent.ExecutionContext.Implicits.global

这几乎是一个必选项,否则,程序在编译时将会报错。我们通过Future伴生对象提供的apply方法创建第一份“未来计划”:

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}

有两种方法确定这个异步计算是否已经得到结果:

调用future.isCompleted,如果异步计算还未执行完毕,则返回false。

调用future.value,如果计算完毕,则返回Some(Sussess(value)),否则返回None。

为什么value方法做了两层包裹呢?首先,需要考虑到这个异步计算是否执行完毕。因此最外层返回的是一个Option类型,如果有计算结果,则返回,否则None。

另外,计算结果也包含了两种情况。如果计算时没有出现错误,则计算结果可以装入Success类中返回。反之,调用value将返回一个Failure。

Try类型

Success和Failure属于Try类,代表着两个异步运算的两个可能结果。它的目的是提供一个在同步计算中类似于try...catch的效果,允许程序员自行处理返回Failure的情况。

在异步编程当中,try/catch语句将不再有效,因为Future计算经常都在别的线程当中执行,而导致原线程当中并不能捕捉到异常。此时,大写的Try类型就排上用场了:如果某个异步计算抛出了Failure,则说明这个计算过程当中出现了一些意外。

对Future进行流式处理

某个异步计算的Future可以通过map,filter等操作衔接到另一个异步计算中。例如:

//后续的代码端中将不再提醒导入执行上下文。importscala.concurrent.ExecutionContext.Implicits.globalvalfuture1:Future[Int]=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)println("执行future1")4}valfuture2:Future[Int]=future1.map(p=>{//两秒后执行这个计算。Thread.sleep(2000)println("执行future2")p+5})

第一个异步计算会在3秒后执行,并返回一个Int值类型。在理想情况下,当第一个异步计算执行完毕后,它的下一步将是把刚才的返回值进行加操作。这个流程又被命名为future2。显然,它的计算返回值仍然是一个Future类型。

对于主线程而言,它将在大约5秒之后得到一个结果:9。

使用for表达式对Future做变换

Scala的for表达式功能要比Java强大得多,包括用于组合Future计算事件。

基于上述的异步运算future1和future2,我们创建第三个运算future3,对刚才的两个运算结果进行加和。代码清单如下:

//后续的代码端中将不再提醒导入执行上下文。importscala.concurrent.ExecutionContext.Implicits.global//观察开始时间println(newjava.util.Date())valfuture1:Future[Int]=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)println("执行future1")4}valfuture2:Future[Int]=Future{Thread.sleep(2000)println("执行future2")5}valfuture3:Future[Int]=for{x<-future1y<-future2}yield{x+y}//我们这里使用了Await等待结果调用完毕,不限制等待时间。println(Await.result(future3,Duration.Inf))//观察结束时间println(newjava.util.Date())

for循环在底层实际上会将这段代码转换为串行化的flatmap句式:future1.faltMap(x=>future2.map(y=>x+y))。从主程序完成到完成计算,总共花费了3秒的时间(而不是5秒),因为上述的代码都是在异步的环境中执行完成的。

我们可以画出一个简单的PETRI图出来,并求出这个图的最短完成时间(详情参考离散数学科目:关键路径的相关知识)。

注意,如果使用for表达式对Future做变换,一定要将Future声明在for循环的前面,否则for表达式将在串行的环境下完成它们。

创建Success,Failure

Future提供诸多已经完成的future的工厂方法:successful,failed以及fromTry。这些方法不需要手动导入上下文。

使用successful方法来创建一个已经完成的future:

valfuture:Future[Int]=Future.successful({println("返回一个已经完成的Success[T]")100})//Some(Success(100))println(future.value)

使用failed方法创建一个已经完成,但是出现异常的future:

valfuture:Future[Nothing]=Future.failed({println("该方法用于返回一个Failure[T]")newException("Oops!")})//Some(Failure(java.lang.Exception:Oops!))println(future.value)

如果不确定抛出Try[+T]的哪一种情况,则调用fromTry:

valfuture:Future[Nothing]=Future.fromTry({println("可能返回Success或者Failure")//Success(100)Failure(newException("Oops!"))})println(future.value)两种等待方式Await同步等待

本文刚才所提到的Await是一种同步等待机制,主线程会在有限的时间内等待某个Future进行。

我们另引入一个包:Scala.concurrent.duration._,这样就允许我们使用2second这种方式来表示我们的最大等待时间了(笔者曾经在隐式转换章节中介绍过如何实现它)。

Await主要有两个方法。第一个用法是调用result另主线程进入阻塞等待,直到获取该future的返回值。

valintFuture=Future{println("正在计算...")println("执行此计算任务的线程是:"+Thread.currentThread().getName)Thread.sleep(1000)30}//主程序会在3秒内等待该结果,并赋值。valint:Int=Await.result(intFuture,3second)println(int)

一般用于需要获取到该future的返回值才能做进一步操作的情况,如果只关心该future的完成状态,可以调用ready方法。当future仍处于工作状态时,主线程会等待至多3秒。

Await.ready(intFuture,3second)

另外,通过Thread.currentThread().getName可以发现,此future是由另一个线程执行的:ForkJoinPool-X-worker-XX。

onComplete异步等待

忠告:如果你已经进入了Future空间内,就尽量不要再使用Await阻塞future的执行。Scala提供注册“回调函数”的方式来令你通过函数副作用获取到某个future在未来返回的值。

valintFuture=Future{println("正在计算...")println("执行此计算任务的线程是:"+Thread.currentThread().getName)Thread.sleep(1000)30}//Await.ready(intFuture,3second)//和刚才的情况不同,如果主线程不阻塞一会,那么这个程序会提前结束推出。Thread.sleep(3000)varintValue:Int=0intFutureonComplete{caseSuccess(value)=>println(value)//通过代码块副作用获取到这个Future的value返回值。intValue=valuecase_=>println("出现了意外的错误")}

这种方式不会阻塞主线程,为了能看到程序运行结果,我们需要主动调用Thread.sleep让主线程休眠一会,否则程序会立刻结束。onComplete的返回值是一个Unit数据类型。

使用andThen强制保证future的执行顺序

一个future可以绑定多个onComplete。然而,上下文环境并不会保证哪个future的onComplete会被率先触发,而andThen方法保证了回调函数的执行顺序。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{Thread.sleep(2000)println(Thread.currentThread().getName)200}//主程序的onComplete方法的调用顺序不一定intFutureonComplete{caseSuccess(int)=>println(s"thisfuturereturned$int")case_=>println("somethingwronghashappened.")}intFutureonComplete{caseSuccess(int)=>println(s"completedwiththevalueof$int")case_=>println("somethingwronghashappened.")}Thread.sleep(3000)

执行上述的程序,控制台有可能先打印thisfuturereturned$int,也有可能先打印completedwiththevalueof$int。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{Thread.sleep(2000)println(Thread.currentThread().getName)200}intFutureonComplete{caseSuccess(int)=>println(s"thisfuturereturned$int")case_=>println("somethingwronghashappened.")}intFutureandThen{caseSuccess(int)=>println(s"completedwiththevalueof$int")case_=>println("somethingwronghashappened.")}Thread.sleep(3000)

andThen方法会返回原future的一个镜像,并且只会在该future调用完onCompelete方法之后,andThen才会执行。

Promise

当我们不确定future何时会完成时,可以会借助Promise许下一个“承诺”,它表示:在某个未来的时间点,一定能够得到值。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}2

然而,这个Int值的计算实际上委托给了其它的future来完成。受托的Future在计算完结果之后会调用该promise的success方法来“兑现”这个承诺。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}3

考虑到异常情况,除了success方法,Promise还提供了failure,Complete等方法。无论调用哪种方法,一个Promise都只能被使用一次。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}4

随后此promise的future会进入就绪状态,我们使用刚才介绍的onComplete回调函数中"兑现"它的返回值。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}5

PromisedInt在这里充当着代理的作用。它承诺提供的值具体要由哪个future来计算并提供,程序的调用者可能并不关心:它也许是intFuture,也许是IntFuture2。因此,我们仅需要为代理(PromisedInt.future)设置回调函数,而不是其它的future。为了方便理解,这里给出连贯的代码清单:

importscala.concurrent.ExecutionContext.Implicits.globalvalfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}2valintFuture=Future{println("正在计算...")println("执行此计算任务的线程是:"+Thread.currentThread().getName)Thread.sleep(1000)//promisedInt承诺的值由intFuture真正实现。promisedInt.success(300)promisedInt.failure(newException("可能的错误"))promisedInt.complete(Success(1))}//和刚才的情况不同,如果主线程不阻塞一会,那么这个程序会提前结束退出。Thread.sleep(3000)//主函数只关心promisedInt能否提供值。promisedInt.futureonComplete{caseSuccess(value)=>println(value)case_=>println("出现了意外的错误")}过滤Future的返回值

Scala提供两种方式让你对future的返回值进行检查,或者过滤。filter方法可以对future的结果进行检验。如果该值合法,就进行保留。下面的例子使用filter确保返回值是满足>=30的值。注意,执行filter方法之后得到的是另一个future值。

importscala.concurrent.ExecutionContext.Implicits.globalvaleventualInt=Future{Thread.sleep(3000)print(s"${Thread.currentThread().getName}:returnresult.")12}//检查返回值是否>=30.valcheckRes:Future[Int]=eventualIntfilter(_>=30)//阻塞等待while(!checkRes.isCompleted){Thread.sleep(1000)println("waiting..")}//注册回调。checkResonComplete{caseSuccess(res)=>println(s"result:$res")caseFailure(cause)=>println(s"failedbecauseof$cause")}

如果不满足匹配的要求,则它会返回java.util.NoSuchElementException:Future.filterpredicateisnotsatisfied。你可以在caseFailure(casue)=>中捕获它。

Future的collect方法允许你使用偏函数对结果进行中间变换,可以使用case语句对偏函数进行缩写。

valfuture=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)200}8处理失败的预期failed方法

Scala提供了几种处理失败的future的方式:包含failed,fallbackTo,recover和recoverWith。举例:如果某个future在执行时出现异常,则failed方法会返回一个成功的Future[Throwable]实例。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{10/0}intFutureonComplete{caseSuccess(int)=>println(int)caseFailure(cause)=>println(s"failedbecauseof$cause")}valeventualThrowable:Future[Throwable]=intFuturefailed//Some(Success(java.lang.ArithmeticException:/byzero))println(eventualThrowable.value)

如果future是被正常执行的,则failed方法反而会抛出NoSuchElement。

fallbackTo方法

fallbackTo方法提供了保险机制,它允许原始的future失败时,转而去运行另一个future2。

//后续的代码端中将不再提醒导入执行上下文。importscala.concurrent.ExecutionContext.Implicits.globalvalfuture1:Future[Int]=Future{//在这个计划中,执行它的线程会首先休眠3秒,然后返回一个Int值。Thread.sleep(3000)println("执行future1")4}valfuture2:Future[Int]=future1.map(p=>{//两秒后执行这个计算。Thread.sleep(2000)println("执行future2")p+5})0

无论intFuture执行是否成功,maybeFailed也总是会运行(笔者亲测),因此不要在这里设置一些具有副作用的代码。当intFuture运行成功时,maybeFailed的返回值将被会忽略,它实际返回的是intFuture的返回值。在intFuture运行失败的情况下,maybeFailed方法的返回值才会生效。

如果maybeFailed在执行时也出现了异常,则它抛出的异常将被忽略,捕捉到的将是上一级intFuture的原始异常。

recover方法

另一个方法recover和fallbackTo方法的逻辑类似:如果它捕获到了异常,则允许你根据异常的类型采取对应的策略并返回值。但是如果调用它的原始future执行成功,则这个备用值同样会被忽略。同理,传入recover的偏函数中如果没有对指定异常的处理,则原始future的异常会被透传。

importscala.concurrent.ExecutionContext.Implicits.globalvalintFuture=Future{10/0}valeventualInt:Future[Int]=intFuturerecover{caseex:ArithmeticException=>100caseex:Exception=>200}intFutureonComplete{caseSuccess(int)=>println(int)caseFailure(cause)=>println(s"failedbecauseof$cause")}Thread.sleep(3000)pr

logo设计

创造品牌价值

¥500元起

APP开发

量身定制,源码交付

¥2000元起

商标注册

一个好品牌从商标开始

¥1480元起

公司注册

注册公司全程代办

¥0元起

    官方电话官方服务
      官方网站八戒财税知识产权八戒服务商企业需求数字市场
相似回答
大家正在搜