概述

下面均使用并模拟了两个task2:一个在Future中抛出一次异常的task1(Future使用了onComplete)、一个在Future内外均各抛出一次异常的task2(Future使用了onComplete)

package io.growing.schedule

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.Failure

/**
 *
 * @author liguobin@growingio.com
 * @version 1.0,2019/9/17
 */
object UserTask {

  //仅load内部抛出运行时异常
  def task1(): Unit = {
    println("我开始执行任务1")
    load(1).map { x =>
      println("任务1-我在执行更新数据 x: " + x)
    }
  }

  private var j = 0

  //load内部与外部各抛一次运行时异常
  def task2() = {
    println("我开始执行任务2")
    load(2).map { x =>
      println("任务2-我在执行更新数据 x: " + x)
    }
    j += 1
    //模拟仅抛出一次异常
    if (j == 1) {
      throw new Exception("Future 外抛出的异常") 
    }
  }

  //@see https://docs.scala-lang.org/zh-cn/overviews/core/futures.html
  //当Future带着某个值就位时,我们就说该Future携带计算结果成功就位。
  //当Future因对应计算过程抛出异常而就绪,我们就说这个Future因该异常而失败。
  //如果此Future以异常完成,那么新的Future也将包含此异常
  private var i = 0

  private def load(name: Int) = {
    val f = Future {
      i += 1
      if (i < 2) {
        println(s"任务$name-我正在抛出异常")
        throw new Exception("Future 内抛出的异常")
      }
      1
    }
    f.onComplete {
      case Failure(t) => println(s"任务$name-load方法捕获到错误了")
      case _ =>
    }
    f
  }

}

Java Timer

Timer是一个比较老的库,从Java1.3版本开始的,并且是线程安全的,其内部均使用synchronized

基本原理

  • 其内部将任务TimerTask(Runnable)传递进优先级队列数据结构TaskQueue,其中优先级以下次被执行时间为值构造小根堆(nextExecutionTime,由fixUp、fixDown进行上下调整)。
  • 执行schedule方法时,task的下次执行时间已经根据传进的delay+当前系统时间戳计算好。并将TimerTask入队,然后获取小根堆的根节点,如果是当前节点则直接将它唤醒(根节点表示离将被执行时间最近的任务,如果小根堆是当前任务,那么当前任务就是马上被执行的任务)。
  • 初始化Timer对象时,会启动TimerThread线程,并将整个任务队列传进去,所有待调度的任务在队列中等待被执行。TimerThread这个相当于整个任务的调度线程。
  • TimerThread线程的run方法会执行mainLoop方法,整个方法是一个死循环,不停的从队列取出可执行的任务,被取消的任务将从队列中删除。其中执行时还会判断当前时间戳与任务入队时的nextExecutionTime,只有在这个时间小于等于当前时间戳时,任务才会kiss被调度,否则不会被处理并被要求开始wait。在这个处理过程中mainLoop方法只会处理中断异常InterruptedException,任何其他异常都将导致任务被取消,并退出mainLoop方法的死循环,任务调度线程直接结束。

Timer主要特性总结

  • Timer 是基于绝对时间的,对系统时间比较敏感
  • Timer 是内部是单一线程,调度线程挂了任务全挂
  • Timer 运行多个 TimeTask 时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,无法在异常出现后继续恢复执行
  • Timer 执行线程默认不是 daemon 线程, 任务执行完,主线程(或其他启动定时器的线程)结束时,task 线程并没有结束。需要注意潜在内存泄漏问题
  • 内部使用了大量的synchronized和锁来保证线程安全性
  • Timer内部优先级队列大小默认是128个,满时扩容为当前容量的2倍,并向上调整

代码示例1

object TimerTest3 extends App {

  pull()

  lazy val timer = new Timer()

  //这里使用main方法测试,使用val不会被初始化,除非加lazy @see https://www.jianshu.com/p/f99e09effea4
  def LongServerTask: TimerTask = new TimerTask {
    override def run(): Unit = {
      UserTask.task1()
    }
  }

  def pull(): Unit = {
    timer.schedule(LongServerTask, 3000, 3000)
    timer.schedule(updateHotDataTask, 3000, 3000)
  }

  def updateHotDataTask: TimerTask = new TimerTask {
    override def run(): Unit = {
      UserTask.task2()
    }
  }
}

结果1

task1和task2的Future中的异常都被吃掉了,task2的异常由于Future外部没有onComplete捕获,会导致task被取消,所有定时任务直接结束,main方法停止。

Java ScheduledExecutorService

基于上述Timer的设计缺陷以及功能上的不足,不引入第三方库的前提下,可以使用现有Java类库的ScheduledExecutorService类。该类是一个接口,从Java1.5开始,继承自ExecutorService接口

我们有两种方法创建:

  • 通过Executors的静态方法newScheduledThreadPool来创建自己的定时任务,只需传入核心线程数量、(ThreadFactory可选)
  • 通过new ScheduledThreadPoolExecutor实例来创建,同样也只需传入核心线程数量、(ThreadFactory可选),甚至可以使用自定义的拒绝策略。实际上Executors仅对ExecutorService的子类作了封装,以便更快更简单创建一个线程池

ScheduledThreadPoolExecutor的创建

两方法的比较:线程池不推荐使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的人更加明确线程池的运行规则,规避资源耗尽的风险

  • newFixedThreadPool和newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
  • newCachedThreadPool和newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。 Scheduled线程池能按时间计划来执行任务,允许用户设定计划执行任务的时间,int类型的参数是设定 线程池中线程的最小数目。当任务较多时,线程池可能会自动创建更多的工作线程来执行任务

ScheduledThreadPoolExecutor的启动

  • scheduleAtFixedRate  固定频率   下一次执行时间相当于是上一次的执行时间加上period
  • scheduleWithFixedDelay  固定周期   下一次执行时间是上一次任务执行完的系统时间加上period,因而具体执行时间不是固定的,但周期是固定的,是采用相对固定的延迟来执行任务

ScheduledThreadPoolExecutor的优点

  • 基于相对时间,即使是固定周期时也是基于纳秒的时间,比毫秒更加精确。
  • 内部是个线程池,所以可以支持多个任务并发执行,Timer就一个任务调度线程
  • 拥有拒绝策略,支持异常捕获后的处理,可自行扩展(只有通过execute提交的任务,才能将它抛出的异常交给UncaughtExceptionHandler,而通过submit提交的任务,无论是抛出的未检测异常还是已检查异常,都将被认为是任务返回状态的一部分,可以捕获Future.get)
  • 使用可重入锁以及CAS而非synchronized关键字

代码示例2

package io.growing.schedule

import java.util.concurrent.{ Executors, _ }


/**
 * @see https://www.jianshu.com/p/925dba9f5969
 * @author liguobin@growingio.com
 * @version 1.0,2019/9/16
 */
object ScheduledThreadPoolExecutorTest extends App {

  pull()

  lazy val delegate = Executors.defaultThreadFactory
  lazy val tf = new ThreadFactory() {
    override def newThread(r: Runnable): Thread = {
      val res = delegate.newThread(r)
      res.setUncaughtExceptionHandler((t: Thread, e: Throwable) => {
        println("uncaughtException捕获到异常了")
        e.printStackTrace()
      })
      res
    }
  }

  lazy val executor = Executors.newScheduledThreadPool(5, tf)
 
  lazy val task1 = new Runnable {
    override def run(): Unit = {
      UserTask.task1()
    }
  }

  lazy val task2 = new Runnable {
    override def run(): Unit = {
      UserTask.task2()
    }
  }

  def pull(): Unit = {
    executor.scheduleWithFixedDelay(task1, 6, 6, TimeUnit.SECONDS)
    executor.scheduleWithFixedDelay(task2, 6, 6, TimeUnit.SECONDS)
  }

}

结果2

task1和task2的Future中的异常都被吃掉了,任务继续执行,task2的异常由于Future外部没有onComplete捕获,但不影响task1的执行,task1仍能继续工作,但task2无法被继续执行。基于此种方法更新缓存,需要自己实现重试策略。

Akka Actor scheduler

其原理使用是系统actor的scheduler调度器定时发送消息给其他actor以触发定时任务的执行。同样,akka定时任务也执行传入两个时间、第一次延迟时间和后续间隔时间,以及消息接受者和消息内容。

Akka2.5.3的官方文档:https://doc.akka.io/docs/akka/2.5.3/scala/scheduler.html,文档讲的很清楚,就不复制了。

代码示例3

package io.growing.schedule

import java.util.concurrent.TimeUnit

import akka.actor.{ Actor, ActorRef, ActorSystem, Props }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
 *
 * @author liguobin@growingio.com
 * @version 1.0,2019/9/17
 */
class JobScheduler(jobActor: ActorRef, system: ActorSystem)(implicit ec: ExecutionContext) {
  def start() {
    //两个定时任务,task1,Future内部异常被捕获
    // task2,Future内部异常被捕获,且Future外还抛出异常,两个task的异常都只会出现一次。
    system.scheduler.schedule(0 seconds, Duration.create(1, TimeUnit.SECONDS), jobActor, "task1")
    system.scheduler.schedule(0 seconds, Duration.create(3, TimeUnit.SECONDS), jobActor, "task2")
  }
}

class JobActor extends Actor {
  def receive = {
    case "task1" => {
      println("receive message from task1...")
      UserTask.task1()
    }
    case "task2" => {
      println("receive message from task2...")
      UserTask.task2()
    }
  }
}

object TestJobScheduler extends App {

  implicit val ec = ExecutionContext.global
  val system = ActorSystem.apply("schedulerJob")
  val jobActor = system.actorOf(Props[JobActor])
  val schedule = new JobScheduler(jobActor, system)
  schedule.start()
}

结果3

Future内抛出的异常被onComplete吃掉了,虽然外部的异常使得akka出错,但是下次任务仍会继续执行,符合需求预期。另外:错误次数呢?

补充

值得一提的是,根据官方文档说明,使用Runnable的定时任务在遇到异常时同样会取消当前task,与ScheduledThreadPoolExecutor表现一致

参见官方文档:https://doc.akka.io/docs/akka/2.5.3/scala/scheduler.html

下面这个定时任务实现方式,一旦task2中抛出未被捕获的异常,整个schedule就不会再被调用,定时任务中止。

//If the `Runnable` throws an exception the repeated scheduling is aborted,
//i.e. the function will not be invoked any more.
system.scheduler.schedule(0 seconds, Duration.create(3, TimeUnit.SECONDS), new Runnable {
    override def run(): Unit = {
        UserTask.task2()
        }
})