昨天我们实现了学习计划和学习进度的统计功能。特别是学习进度部分,为了更精确的记录用户上一次播放的进度,我们采用的方案是:前端每隔15秒就发起一次请求,将播放记录写入数据库。
但问题是,提交播放记录的业务太复杂了,其中涉及到大量的数据库操作:
今天我们就来分析一下,当碰到高并发的数据库写业务时,该如何优化。通过今天的学习,大家可以掌握下面的技能:
解决高并发问题从宏观角度来说有3个方向:
因此,我们本章重点讨论如何通过编码来提供业务的单机并发能力。
在机器性能一定的情况下,提高单机并发能力就是要尽可能缩短业务的响应时间(ResponseTime),而对响应时间影响最大的往往是对数据库的操作。而从数据库角度来说,我们的业务无非就是读或写两种类型。
对于读多写少的业务,其优化手段大家都比较熟悉了,主要包括两方面:
对于高并发写的优化方案有:
代码和SQL优化与读优化类似,我们就不再赘述了,接下来我们着重分析一下变同步为异步、合并写请求两种优化方案。
假如一个业务比较复杂,需要有多次数据库的写业务,如图所示:
优化的思路很简单,我们之前讲解MQ的时候就说过,利用MQ可以把同步业务变成异步,从而提高效率。
优点:
缺点:
合并写请求方案其实是参考高并发读的优化思路:当读数据库并发较高时,我们可以把数据缓存到Redis,这样就无需访问数据库,大大减少数据库压力,减少响应时间。
既然读数据可以建立缓存,那么写数据可以不可以也缓存到Redis呢?
答案是肯定的,合并写请求就是指:
如图:
优点:
播放进度统计包含大量的数据库读、写操作。不过保存播放记录还是以写数据库为主。因此优化的方向还是以高并发写优化为主。
大家思考一下,针对播放进度记录业务来说,应该采用哪种优化方案呢?
虽然播放进度记录业务较为复杂,但是我们认真思考一下整个业务分支:
也就是说,95%的请求都是在更新learning_record表中的moment字段,以及learning_lesson表中的正在学习的小节id和时间。
而播放进度信息,不管更新多少次,下一次续播肯定是从最后的一次播放进度开始续播。也就是说我们只需要记住最后一次即可。因此可以采用合并写方案来降低数据库写的次数和频率,而异步写做不到。
综上,提交播放进度业务虽然看起来复杂,但大多数请求的处理很简单,就是更新播放进度。并且播放进度数据是可以合并的(覆盖之前旧数据)。我们建议采用合并写请求方案:
我们先讨论下Redis缓存中需要记录哪些数据。
我们的优化方案要处理的不是所有的提交学习记录请求。仅仅是视频播放时的高频更新播放进度的请求,对应的业务分支如图:
这条业务支线的流程如下:
这里有多次数据库操作,例如:
一方面我们要缓存写数据,减少写数据库频率;另一方面我们要缓存播放记录,减少查询数据库。因此,缓存中至少要包含3个字段:
既然一个小节要保存多个字段,是不是可以考虑使用Hash结构来保存这些数据,如图:
不过,这样设计有一个问题。课程有很多,每个课程的小节也非常多。每个小节都是一个独立的KEY,需要创建的KEY也会非常多,浪费大量内存。
而且,用户学习视频的过程中,可能会在多个视频之间来回跳转,这就会导致频繁的创建缓存、缓存过期,影响到最终的业务性能。该如何解决呢?
既然一个课程包含多个小节,我们完全可以把一个课程的多个小节作为一个hashKEY来缓存,value以json表示 如图:
这样做有两个好处:
添加缓存以后,学习记录提交的业务流程就需要发生一些变化了,如图:
变化后的业务具体流程为:
对于合并写请求方案,一定有一个步骤就是持久化缓存数据到数据库。一般采用的是定时任务持久化:
但是定时任务的持久化方式在播放进度记录业务中存在一些问题,主要就是时效性问题。我们的产品要求视频续播的时间误差不能超过30秒。
如果产品对于时间误差要求不高,定时任务处理是最简单,最可靠的一种方案,推荐大家使用。
但问题来了,我们怎么知道哪一次提交是最后一次提交呢?
只要用户一直在提交记录,Redis中的播放进度就会一直变化。如果Redis中的播放进度不变,肯定是停止了播放,是最后一次提交。
因此,我们只要能判断Redis中的播放进度是否变化即可。怎么判断呢?
每当前端提交播放记录时,我们可以设置一个延迟任务并保存这次提交的进度。等待20秒后(因为前端每15秒提交一次,20秒就是等待下一次提交),检查Redis中的缓存的进度与任务中的进度是否一致。
为了确定用户提交的播放记录是否变化,我们需要将播放记录保存为一个延迟任务,等待超过一个提交周期(20s)后检查播放进度。
那么延迟任务该如何实现呢?
延迟任务的实现方案有很多,常见的有四类:
以上四种方案都可以解决问题,不过本例中我们会使用DelayQueue方案。因为这种方案使用成本最低,而且不依赖任何第三方服务,减少了网络交互。
但缺点也很明显,就是需要占用JVM内存,在数据量非常大的情况下可能会有问题。但考虑到任务存储时间比较短(只有20秒),因此也可以接收。
如果你们的数据量非常大,DelayQueue不能满足业务需求,大家也可以替换为其它延迟队列方式,例如Redisson、MQ等。
首先来看一下DelayQueue的源码:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
// ... 略
}
可以看到DelayQueue实现了BlockingQueue接口,是一个阻塞队列。队列就是容器,用来存储东西的。DelayQueue叫做延迟队列,其中存储的就是延迟执行的任务。
我们可以看到DelayQueue的泛型定义:
DelayQueue<E extends Delayed>
这说明存入DelayQueue内部的元素必须是Delayed类型,这其实就是一个延迟任务的规范接口。来看一下:
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
===============================================================================
public interface Comparable<T> {
public int compareTo(T o);
}
从源码中可以看出,Delayed类型必须具备两个方法:
可见,Delayed类型的延迟任务具备两个功能:获取剩余延迟时间、比较执行顺序。
当然,我们可以对Delayed做实现和功能扩展,比如添加延迟任务的数据。
首先定义一个Delayed类型的延迟任务类,要能保持任务数据。
@Data
public class DelayTask<D> implements Delayed {
private D data;
private long deadlineNanos;
public DelayTask(D data, Duration delayTime) {
this.data = data;
this.deadlineNanos = System.nanoTime() + delayTime.toNanos();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(Math.max(0, deadlineNanos - System.nanoTime()), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {
long l = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
if(l > 0){
return 1;
}else if(l < 0){
return -1;
}else {
return 0;
}
}
}
接下来就可以创建延迟任务,交给延迟队列保存:
@Slf4j
class DelayTaskTest {
@Test
void testDelayQueue() throws InterruptedException {
// 1.初始化延迟队列
DelayQueue<DelayTask<String>> queue = new DelayQueue<>();
// 2.向队列中添加延迟执行的任务
log.info("开始初始化延迟任务。。。。");
queue.add(new DelayTask<>("延迟任务3", Duration.ofSeconds(3)));
queue.add(new DelayTask<>("延迟任务1", Duration.ofSeconds(1)));
queue.add(new DelayTask<>("延迟任务2", Duration.ofSeconds(2)));
// 3.尝试执行任务
while (true) {
DelayTask<String> task = queue.take();
log.info("开始执行延迟任务:{}", task.getData());
}
}
}
注意:
接下来,我们就可以按照之前分析的方案来改造代码了。
首先,我们要定义一个工具类,帮助我们改造整个业务。在提交学习记录业务中,需要用到异步任务和缓存的地方有以下几处:
因此,我们的工具类就应该具备上述4个方法:
工具类代码如下:
@Slf4j
@Component
@RequiredArgsConstructor
public class LearningRecordDelayTaskHandler {
@Data
@NoArgsConstructor
private static class RecordCacheData{
//这是放到redis的hash中的value数据
private Long id; //学习记录的id
private Integer moment;//进度
private Boolean finished;//是否完成
public RecordCacheData(LearningRecord record) {
this.id = record.getId();
this.moment = record.getMoment();
this.finished = record.getFinished();
}
}
@Data
@NoArgsConstructor
private static class RecordTaskData{
//这是放到延迟队列中的数据
private Long lessonId; //课程id
private Long sectionId;//小节id
private Integer moment;//进度
public RecordTaskData(LearningRecord record) {
this.lessonId = record.getLessonId();
this.sectionId = record.getSectionId();
this.moment = record.getMoment();
}
}
private final StringRedisTemplate redisTemplate;
private final LearningRecordMapper recordMapper;
private final ILearningLessonService lessonService;
private final DelayQueue<DelayTask<RecordTaskData>> queue = new DelayQueue<>();
private final static String RECORD_KEY_TEMPLATE = "learning:record:{}";
private static volatile boolean begin = true;
@PostConstruct //注解:项目启动后 当前类实例化属性注入之后方法就会运行,做初始化工作
public void init(){
//开启一个新线程执行handleDelayTask方法
CompletableFuture.runAsync(this::handleDelayTask);
}
@PreDestroy
public void destroy(){ //当前类实例销毁之前 该方法执行
begin = false;
log.debug("延迟任务停止执行!");
}
//init()和destroy()方法就是为了控制handleDelayTask()方法的开启和关闭
public void handleDelayTask(){
while (begin) {//初始为true 类实例销毁 destroy方法置为false while不再循环也就不再占用CPU资源
try {
// 1.获取到期的延迟任务 take是一个阻塞方法 会一直等待队列的消息
DelayTask<RecordTaskData> task = queue.take();
//拿到消息data (包含三个属性:lessonId(课程id);sectionId(小节id);moment(进度时间))
RecordTaskData data = task.getData();
// 2.通过data查询Redis缓存
LearningRecord record = readRecordCache(data.getLessonId(), data.getSectionId());
if (record == null) {
continue;
}
// 3.比较redis中 和 队列中的moment值
if(!Objects.equals(data.getMoment(), record.getMoment())) {
// 不一致,说明用户还在持续提交播放进度,放弃旧数据
continue;
}
// 4.一致,说明在这20S内都没更新进度(因为前端15S传一次moment),即停止了播放。
// 持久化播放进度数据到数据库
// 4.1.更新学习记录的moment 到mysql
record.setFinished(null);
recordMapper.updateById(record);
// 4.2.更新课表最近学习信息 到mysql
LearningLesson lesson = new LearningLesson();
lesson.setId(data.getLessonId());//课程id
lesson.setLatestSectionId(data.getSectionId());//小节id
lesson.setLatestLearnTime(LocalDateTime.now());//进度
lessonService.updateById(lesson);
} catch (Exception e) {
log.error("处理延迟任务发生异常", e);
}
}
}
public void addLearningRecordTask(LearningRecord record){
// 1.添加数据到Redis缓存
writeRecordCache(record);
// 2.提交延迟任务到延迟队列DelayQueue(record是个实体类 这里添加到队列只取三个值:
// 课程id、小节id、进度 (可看RecordTaskData类的属性就这三个)
// 设置延迟任务20秒
queue.add(new DelayTask<>(new RecordTaskData(record), Duration.ofSeconds(20)));
}
public void writeRecordCache(LearningRecord record) {
log.debug("更新学习记录的缓存数据");
try {
// 1.数据转换 RecordCacheData类 只取record学习记录实体类的三个属性:
// id;moment;finished;
String json = JsonUtils.toJsonStr(new RecordCacheData(record));
// 2.写入Redis
String key = StringUtils.format(RECORD_KEY_TEMPLATE, record.getLessonId());
redisTemplate.opsForHash().put(key, record.getSectionId().toString(), json);
// 3.添加缓存过期时间1分钟 只要大于20秒就行
redisTemplate.expire(key, Duration.ofMinutes(1));
} catch (Exception e) {
log.error("更新学习记录缓存异常", e);
}
}
public LearningRecord readRecordCache(Long lessonId, Long sectionId){
try {
// 1.根据课程id和小节id 读取Redis数据
String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId);
Object cacheData = redisTemplate.opsForHash().get(key, sectionId.toString());
if (cacheData == null) {
return null;
}
// 2.数据检查和转换
return JsonUtils.toBean(cacheData.toString(), LearningRecord.class);
} catch (Exception e) {
log.error("缓存读取异常", e);
return null;
}
}
public void cleanRecordCache(Long lessonId, Long sectionId){
// 删除数据
String key = StringUtils.format(RECORD_KEY_TEMPLATE, lessonId);
redisTemplate.opsForHash().delete(key, sectionId.toString());
}
}
接下来,改造提交学习记录的功能:
@Service
@RequiredArgsConstructor
public class LearningRecordServiceImpl extends ServiceImpl<LearningRecordMapper, LearningRecord> implements ILearningRecordService {
// 学生课程表 服务接口
private final ILearningLessonService lessonService;
//构造注入对象 为了远程调用课程服务
private final CourseClient courseClient;
//延迟队列工具类
private final LearningRecordDelayTaskHandler taskHandler;
//提交学习记录
@Override
@Transactional
public void addLearningRecord(LearningRecordFormDTO recordFormDTO){
//1 获取登录用户
Long userId = UserContext.getUser();
//2 处理学习记录
boolean finished = false;
if(recordFormDTO.getSectionType().equals( SectionType.VIDEO)){
//判断当前小节类型 如果是视频 执行处理视频逻辑 对应流程图的右边
finished = handleVideoRecord(userId, recordFormDTO);
}else{//如果是考试类型 处理考试 对应流程图左边
finished = handleExamRecord(userId, recordFormDTO);
}
if (!finished) {
// 没有新学完的小节,无需更新课表中的学习进度 直接返回
//finished为false情况1:redis和mysql中都没查到学习记录,走新增学习记录那条线
// 情况2:不是第一次学完 将学习记录写入到缓存。对应流程图序号①
return;
}
// 3.finished为true 处理课表数据
//为true情况1:是第一次学完,需要更新学习记录
//情况2:学习记录是考试
handleLearningLessonsChanges(recordFormDTO);
}
private void handleLearningLessonsChanges(LearningRecordFormDTO recordDTO) {
// 1.查询课表
LearningLesson lesson = lessonService.getById(recordDTO.getLessonId());
if (lesson == null) {
throw new BizIllegalException("课程不存在,无法更新数据!");
}
// 2.判断是否有新的完成小节
boolean allLearned = false;
// 3.如果有新完成的小节,则需要查询课程数据
CourseFullInfoDTO cInfo = courseClient.getCourseInfoById(lesson.getCourseId(), false, false);
if (cInfo == null) {
throw new BizIllegalException("课程不存在,无法更新数据!");
}
// 4.比较课程是否全部学完:已学习小节 >= 课程总小节
allLearned = lesson.getLearnedSections() + 1 >= cInfo.getSectionNum();
// 5.更新课表
lessonService.lambdaUpdate()
.set(lesson.getLearnedSections() == 0, LearningLesson::getStatus, LessonStatus.LEARNING.getValue())
.set(allLearned, LearningLesson::getStatus, LessonStatus.FINISHED.getValue())
.set(allLearned, LearningLesson::getUpdateTime, LocalDateTime.now())
.setSql("learned_sections = learned_sections + 1")
.eq(LearningLesson::getId, lesson.getId())
.update();
}
private boolean handleVideoRecord(Long userId, LearningRecordFormDTO recordDTO) {
// 1.查询旧的学习记录
LearningRecord old = queryOldRecord(recordDTO.getLessonId(), recordDTO.getSectionId());
// 2.判断是否存在
if (old == null) {
// 3.不存在,则新增
// 3.1.转换PO
LearningRecord record = BeanUtils.copyBean(recordDTO, LearningRecord.class);
// 3.2.填充数据
record.setUserId(userId);
// 3.3.写入数据库
boolean success = save(record);
if (!success) {
throw new DbException("新增学习记录失败!");
}
return false;
}
// 4.存在,则更新
// 4.1.判断是否是第一次学完(前一次记录是未完成 并且 观看时长大于一半)
boolean finished = !old.getFinished() && recordDTO.getMoment() * 2 >= recordDTO.getDuration();
if (!finished) { //如果不是第一次学完 将学习记录写入到缓存
LearningRecord record = new LearningRecord();
record.setLessonId(recordDTO.getLessonId());
record.setSectionId(recordDTO.getSectionId());
record.setMoment(recordDTO.getMoment());
record.setId(old.getId());
record.setFinished(old.getFinished());
taskHandler.addLearningRecordTask(record);
return false; //返回
}
// 4.2.是第一次学完 则更新数据并清理缓存(为了保持redis和mysql数据一致)
boolean success = lambdaUpdate()
.set(LearningRecord::getMoment, recordDTO.getMoment())
.set(LearningRecord::getFinished, true)
.set(LearningRecord::getFinishTime, recordDTO.getCommitTime())
.eq(LearningRecord::getId, old.getId())
.update();
if (!success) {
throw new DbException("更新学习记录失败!");
}
// 4.3.清理缓存
taskHandler.cleanRecordCache(recordDTO.getLessonId(), recordDTO.getSectionId());
return true; //返回
}
private LearningRecord queryOldRecord(Long lessonId, Long sectionId) {
//1 首先查缓存
LearningRecord record = taskHandler.readRecordCache(lessonId, sectionId);
//2 读取到则直接返回
if(record != null) return record;
//3 未读取到 查询数据库
record = lambdaQuery()
.eq(LearningRecord::getLessonId, lessonId)
.eq(LearningRecord::getSectionId, sectionId)
.one();
//4 从数据库读取到的记录写入缓存
taskHandler.writeRecordCache(record);
//返回
return record;
}
//处理考试的逻辑 没有用到redis 因为操作DB数据量不大
private boolean handleExamRecord(Long userId, LearningRecordFormDTO recordDTO) {
// 1.转换DTO为PO
LearningRecord record = BeanUtils.copyBean(recordDTO, LearningRecord.class);
// 2.填充数据
record.setUserId(userId);
record.setFinished(true);
record.setFinishTime(recordDTO.getCommitTime());
// 3.写入数据库
boolean success = save(record);
if (!success) {
throw new DbException("新增考试记录失败!");
}
return true;
}
}
目前我们的延迟任务执行还是单线程模式,大家将其改造为线程池模式,核心线程数与CPU核数一致即可。
声明线程池:
然后把延迟任务方法的代码放在线程的run方法中即可:
因篇幅问题不能全部显示,请点此查看更多更全内容