本文实例为大家分享了SpringBoot实现动态多线程并发定时任务的具体代码,供大家参考,具体内容如下
实现定时任务有多种方式,使用spring自带的,继承SchedulingConfigurer的方式。
一、实现
1、启动类
在启动类添加注解@EnableScheduling开启,不然不起用做。
2、新建任务类
添加注解@Component注册到spring的容器中。
package com.example.demo.task; import com.example.demo.entity.MyTask; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.PreDestroy; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; /** * @path:com.example.demo.task.ScheduledTask.java * @className:ScheduledTask.java * @description:定时任务 * @author:tanyp * @dateTime:2020/7/23 21:37 * @editNote: */ @Slf4j @Component public class ScheduledTask implements SchedulingConfigurer { private volatile ScheduledTaskRegistrar registrar; private final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, CronTask> cronTasks = new ConcurrentHashMap<>(); /** * 默认启动10个线程 */ private static final Integer DEFAULT_THREAD_POOL = 10; @Override public void configureTasks(ScheduledTaskRegistrar registrar) { registrar.setScheduler(Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL)); this.registrar = registrar; } @PreDestroy public void destroy() { this.registrar.destroy(); } /** * @methodName:refreshTask * @description:初始化任务 * 1、从数据库获取执行任务的集合【TxTask】 * 2、通过调用 【refresh】 方法刷新任务列表 * 3、每次数据库中的任务发生变化后重新执行【1、2】 * @author:tanyp * @dateTime:2020/7/23 21:37 * @Params: [tasks] * @Return: void * @editNote: */ public void refreshTask(List<MyTask> tasks) { // 删除已经取消任务 scheduledFutures.keySet().forEach(key -> { if (Objects.isNull(tasks) || tasks.size() == 0) { scheduledFutures.get(key).cancel(false); scheduledFutures.remove(key); cronTasks.remove(key); return; } tasks.forEach(task -> { if (!Objects.equals(key, task.getTaskId())) { scheduledFutures.get(key).cancel(false); scheduledFutures.remove(key); cronTasks.remove(key); return; } }); }); // 添加新任务、更改执行规则任务 tasks.forEach(txTask -> { String expression = txTask.getExpression(); // 任务表达式为空则跳过 if (StringUtils.isEmpty(expression)) { return; } // 任务已存在并且表达式未发生变化则跳过 if (scheduledFutures.containsKey(txTask.getTaskId()) && cronTasks.get(txTask.getTaskId()).getExpression().equals(expression)) { return; } // 任务执行时间发生了变化,则删除该任务 if (scheduledFutures.containsKey(txTask.getTaskId())) { scheduledFutures.get(txTask.getTaskId()).cancel(false); scheduledFutures.remove(txTask.getTaskId()); cronTasks.remove(txTask.getTaskId()); } CronTask task = new CronTask(new Runnable() { @Override public void run() { // 执行业务逻辑 try { log.info("执行单个任务,任务ID【{}】执行规则【{}】", txTask.getTaskId(), txTask.getExpression()); System.out.println("==========================执行任务============================="); } catch (Exception e) { log.error("执行发送消息任务异常,异常信息:{}", e); } } }, expression); ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger()); cronTasks.put(txTask.getTaskId(), task); scheduledFutures.put(txTask.getTaskId(), future); }); } }
3、创建自启动任务类
package com.example.demo.task; import com.example.demo.task.ScheduledTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; /** * @path:com.example.demo.task.MyApplicationRunner.java * @className:ScheduledTask.java * @description:自启动 * @author:tanyp * @dateTime:2020/7/23 21:37 * @editNote: */ @Slf4j @Component public class MyApplicationRunner implements ApplicationRunner { @Autowired private ScheduledTask scheduledTask; @Override public void run(ApplicationArguments args) throws Exception { log.info("================项目启动初始化定时任务====开始==========="); /** * 初始化三个任务: * 1、10秒执行一次 * 2、15秒执行一次 * 3、20秒执行一次 */ List<MyTask> tasks = Arrays.asList( MyTask.builder().taskId("10001").expression("*/10 * * * * ?").build(), MyTask.builder().taskId("10002").expression("*/15 * * * * ?").build(), MyTask.builder().taskId("10003").expression("*/20 * * * * ?").build() ); scheduledTask.refreshTask(tasks); log.info("================项目启动初始化定时任务====完成=========="); } }
4、实体
package com.example.demo.entity; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @path:com.example.demo.entity.MyTask.java * @className:MyTask.java * @description:任务实体 * @author:tanyp * @dateTime:2020/7/23 21:41 * @editNote: */ @Data @Builder @AllArgsConstructor @NoArgsConstructor public class MyTask { /** * 任务id */ private String taskId; /** * 任务执行规则时间 */ private String expression; }
二、测试
初始化三个任务,分别为:
10秒执行一次(*/10 * * * * ?)
15秒执行一次(*/15 * * * * ?)
20秒执行一次(*/20 * * * * ?)
测试效果:
可以看到初始化的三个任务都在执行,并且是不用的线程在执行。
三、动态使用方式
1、启动方式有两种:
启动项目后,手动调用ScheduledTask.refreshTask(List tasks),并初始化任务列表; 使用我测试中的方式,配置项目启动完成后自动调用初始任务的方法,并初始化任务列表。2、数据初始化
只需要给 List集合赋值并调用refreshTask()方法即可:
根据业务需求修改MyTask实体类; 这里的初始化数据可以从数据库读取数据赋值给集合;例如:从mysql读取任务配置表的数据,调用refreshTask()方法。
3、如何动态?
修改:修改某一项正在执行的任务规则; 添加:添加一项新的任务; 删除:停止某一项正在执行的任务。例如:我们有一张任务配置表,此时进行分别新增一条或多条数据、删除一条或多条数据、改一条数据,只需要完成以上任何一项操作后,重新调用一下refreshTask()方法即可。
怎么重新调用 refreshTask()方法:可以另外启一个任务实时监控任务表的数据变化。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。