AVI 是一种比较老的视频格式,现代浏览器通常不直接支持 AVI 格式,因此在网页上播放 AVI 视频时,您需要将视频转换为一种更常见的格式(如 MP4),或使用一些第三方的 JavaScript 播放器库。

下面是整个转码服务的运行过程

FFmpeg处理视频

使用FFmpeg进行视频转码操作

分布式任务调度

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

如何保证任务不重复执行?

1)调度中心按分片广播的方式去下发任务
2)执行器收到作北分片广播的参数:分片总教和分片序号,计管任务id除以分片总数得到一个余数.如果余数等于分片序号这时就去执行这个任务,这里保证了不同的执行器执行不同的任务。
3)配置调度过期策路为〝忽略”,避免同一个执行器多次重复执行同—个任务
4)配置任务阻塞处理策略为〝丢弃后续调度〞,注意:丢弃也没事下一次调度就又可以执行了
5)另外还要保证任务处理的幂等性,执行过的任务可以打一个状态标记已完成,下次再调度执行该任务判断该任务已完成就不再执行。

任务幂等性如何保证?

它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。

幂等性是为了解決重复提交问题,比如:恶意刷单,重复支付等。

解决幂等性常用的方案:

1)数据库约束,比如:唯一素引,主键。同一个主键不可能两次都插入成功。
2) 乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,请求前生成唯一的序列号,携带序列号去请求,执行时在redis记录该序列号表示以该序列号的请求执行过了,如果相同的序列号再次来执行说明是重复执行。

这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断

安装XXL-JOB

使用Docker部署,使用下面命令安装:

docker pull xuxueli/xxl-job-admin:2.3.0

使用如下命令启动:

docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://ip:3306/xxl_job?useSSL=false&serverTimezone=UTC&characterEncoding=utf-8&allowPublicKeyRetrieval=true \
--spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver \
--spring.datasource.username=root \
--spring.datasource.password=root" \
-p 8080:8080 \
-v /tmp:/data/applogs \
--name xxl-job-admin \
-d xuxueli/xxl-job-admin:2.3.0

注意修改数据源,其中url中的IP设置为服务器IP地址,如果是本机,请使用本机地址不要使用localhost或者127.0.0.1,同时记得开启mysql的远程访问

use mysql;
update user set host='%' where user='root';
flush privileges
grant all privileges on *.* TO 'root'@'%' with grant option;

访问http://localhost:8080/xxl-job-admin/

创建执行器

进入后台管理,点击右侧执行器管理,新建一个执行器

注册执行器

进入Nacos后台,找到media-service-dev.yaml,添加xxl-Job的配置:

media-service-dev.yaml
# xxl-job配置
xxl:
job:
admin:
# 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: media-process-executor
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip:
# 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /Users/swcode/data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
# 执行器通讯TOKEN [选填]:非空时启用;
accessToken:

找到learning-online-media-service模块,在config包下创建XxlJonConfig配置类

XxlJonConfig
@Slf4j
@Configuration
public class XxlJonConfig {

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;


@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}

启动learning-online-media服务,在执行器管理页面可以看到自动注册了一个服务。

添加任务管理

进入后台管理,点击右侧任务管理,新建一个任务

分布式锁

买现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
利用数据库主键唯一性的特点,或利用数据库唯一索号l、行级锁的特点,比如:多个线程同时向数据库插入主键相同的同一条记录,谁插入成功谁就获取锁,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。

2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。

3、使用zookeeper实现
zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。

本次我们使用数据库实现分布锁,后边的模块会使用其它方法到时再洋细介绍,

使用乐观锁机制实现

当某次调用者执行成功时,status被置为4,其他执行都会失败;这个执行成功的就拿到了”锁”。

MediaProcessMapper
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
/**
* 开启一个任务
* @param id 任务id
* @return int 更新成功条数
*/
@Update("update media_process set status = '4' where (status='1' or status='3') and fail_count<3 and id=#{id}")
int startTask(@Param("id") Long id);
}

Service包一层

MediaProcessService
public interface MediaProcessService extends IService<MediaProcess> {
/**
* 开启一个任务
* @param id 任务id
* @return true 开启任务成功,false开启任务失败
*/
public boolean startTask(long id);
}

实现该方法

MediaProcessServiceImpl
@Service
public class MediaProcessServiceImpl extends ServiceImpl<MediaProcessMapper, MediaProcess> implements MediaProcessService {
/**
* 开启一个任务
*
* @param id 任务id
* @return true 开启任务成功,false开启任务失败
*/
@Override
public boolean startTask(long id) {
return baseMapper.startTask(id) > 0;
}
}

实现添加任务

在文件合并完成之后,将数据保存到media_files同时,判断视频文件是否为.avi,如果是需要进行转码操作,将其添加到任务表media_process

找到MediaFilesServiceImpl,修改saveAfterStore方法

MediaFilesServiceImpl
@Override
public MediaFiles saveAfterStore(UploadFileParamDTO dto, Long companyId, String fileMd5, String bucket, String path) {
// 构造参数
MediaFiles mediaFiles = new MediaFiles();
BeanUtils.copyProperties(dto, mediaFiles);
mediaFiles.setId(fileMd5);
mediaFiles.setCompanyId(companyId);
mediaFiles.setBucket(bucket);
mediaFiles.setFilePath(path);
mediaFiles.setFileId(fileMd5);
mediaFiles.setUrl("/" + bucket + "/" + path);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setStatus(MediaFileStatusEnum.NORMAL.status());
mediaFiles.setAuditStatus("002003");
// 保存到媒资文件表
boolean save = save(mediaFiles);
if (!save) {
log.error("想数据库保存文件失败,bucket: {},文件名: {}", bucket, path);
return null;
}
// 记录待处理任务
addWaitingTask(mediaFiles);
return mediaFiles;
}

/**
* 添加文件转码待处理任务
* @param mediaFiles 文件信息
*/
private void addWaitingTask(MediaFiles mediaFiles) {
// 获取文件的mimeType, 如果是avi视频,写入待处理任务
String filename = mediaFiles.getFilename();
String suffix = filename.substring(filename.lastIndexOf("."));
String mineType = getMineType(suffix);
if (!mineType.equals("video/x-msvideo")) {
return;
}
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaProcess, mediaFiles);
mediaProcess.setStatus("1");
mediaProcess.setCreateDate(LocalDateTime.now());
mediaProcess.setFailCount(0);
mediaProcess.setUrl(null);
mediaProcessService.save(mediaProcess);
}

更新任务状态

任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。

创建MediaProcessSaveService接口添加方法

MediaProcessSaveService.java
public interface MediaProcessSaveService {

/**
* 保存任务结果
*
* @param taskId 任务ID
* @param status 任务状态
* @param fileId 文件id
* @param url 转码路径
* @param errorMsg 错误信息
*/
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg);
}

创建实现类MediaProcessSaveServiceImpl实现保存方法:

MediaProcessSaveServiceImpl
@Service
public class MediaProcessSaveServiceImpl implements MediaProcessSaveService {

private final MediaFilesService mediaFilesService;
private final MediaProcessService mediaProcessService;
private final MediaProcessHistoryService mediaProcessHistoryService;

public MediaProcessSaveServiceImpl(MediaFilesService mediaFilesService, MediaProcessService mediaProcessService,
MediaProcessHistoryService mediaProcessHistoryService) {
this.mediaFilesService = mediaFilesService;
this.mediaProcessService = mediaProcessService;
this.mediaProcessHistoryService = mediaProcessHistoryService;
}

/**
* 保存任务结果
*
* @param taskId 任务ID
* @param status 任务状态
* @param fileId 文件id
* @param url 转码路径
* @param errorMsg 错误信息
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
MediaProcess dbProcess = mediaProcessService.getById(taskId);
if (dbProcess == null) {
return;
}
// 任务执行失败
if (status.equals(ProcessStatus.PROCESS_FAIL.status())) {
// 更新失败次数
mediaProcessService.update(Wrappers.<MediaProcess>lambdaUpdate()
.eq(MediaProcess::getId, taskId)
.set(MediaProcess::getStatus, status)
.set(MediaProcess::getFailCount, dbProcess.getFailCount() + 1)
.set(MediaProcess::getErrormsg, errorMsg));
return;
}
// 任务执行成功
MediaFiles mediaFiles = mediaFilesService.getById(fileId);
if (mediaFiles == null) {
return;
}

// 更新media_file中的url
mediaFilesService.update(Wrappers.<MediaFiles>lambdaUpdate().eq(MediaFiles::getId, fileId).set(MediaFiles::getUrl, url));
// 将MediaProcess记录插入到MediaProcessHistory中
dbProcess.setStatus(status);
dbProcess.setFinishDate(LocalDateTime.now());
dbProcess.setUrl(url);
MediaProcessHistory processHistory = new MediaProcessHistory();
BeanUtils.copyProperties(dbProcess, processHistory);
mediaProcessHistoryService.save(processHistory);

// 删除MediaProcess记录
mediaFilesService.removeById(taskId);
}
}

实现获取任务

从数据库中获取任务,根据分片数量和当前分片序号确定当前分片要处理的任务。

找到MediaProcessMapper,添加查询任务方法:

MediaProcessMapper
/**
* 查询当前分片的任务
*
* @param shardTotal 分片总数
* @param shardIndex 当前分片序号
* @param count 任务数量
* @return List<MediaProcess>
*/
@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and t.status <> 2 and t.fail_count < 3 limit #{count}")
List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);

包一层MediaProcessService

MediaProcessService
/**
* 获取待处理任务
*
* @param shardTotal 分片总数
* @param shardIndex 当前分片序号
* @param count 任务数量
* @return List<MediaProcess>
*/
List<MediaProcess> getMediaProcessList(int shardTotal, int shardIndex, int count);

实现该方法,MediaProcessServiceImpl

MediaProcessServiceImpl
/**
* 获取待处理任务
*
* @param shardTotal 分片总数
* @param shardIndex 当前分片序号
* @param count 任务数量
* @return List<MediaProcess>
*/
@Override
public List<MediaProcess> getMediaProcessList(int shardTotal, int shardIndex, int count) {
return baseMapper.selectListByShardIndex(shardTotal, shardIndex, count);
}

实现调度任务

创建包com.swx.media.service.jobhandle,并创建视频编码任务VideoTask

  • 根据系统CPU核心数,确定从数据库查询多少任务,如果任务为0则直接返回;
  • 根据获取的任务数,开启对应数量的线程数;
  • 使用计数器,确保所有任务执行完成后再结束方法;
  • 循环所有任务,指定线程取执行如下步骤:
    • 获取任务锁,实现幂等性,防止重复执行,失败,写入数据库,返回;
    • 从MinIO下载要转码的视频,下载失败,写入数据库,返回;
    • 使用FFmpeg进行视频转码,保存到临时文件;转码失败,写入数据库,返回;
    • 将转码之后的文件上传到MinIO中,上传失败,写入数据库,返回;
    • 上传成功,写入数据库。
  • 使用try finally包裹线程方法,在finally中将计数器减1;
  • 使用计数器阻塞进程,并设置最大等待时间,无论线程中计数器是否减1,最大时间过后释放线程。
VideoTask
/**
* 视频转码任务处理类
*/
@Slf4j
@Component
public class VideoTask {

private final MediaProcessService mediaProcessService;
private final FileStorageService fileStorageService;
private final MediaProcessSaveService mediaProcessSaveService;

public VideoTask(MediaProcessService mediaProcessService, FileStorageService fileStorageService, MediaProcessSaveService mediaProcessSaveService) {
this.mediaProcessService = mediaProcessService;
this.fileStorageService = fileStorageService;
this.mediaProcessSaveService = mediaProcessSaveService;
}

/**
* 视频处理任务
*/
@XxlJob("videoJobHandler")
public void videoJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

// 确定CPU核心数
int processors = Runtime.getRuntime().availableProcessors();
// 拉取任务
List<MediaProcess> mediaProcessList = mediaProcessService.getMediaProcessList(shardIndex, shardTotal, processors);
int size = mediaProcessList.size();
if (size == 0) {
log.debug("取到的视频处理任务数:{}", size);
return;
}
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(size);
// 使用计数器,等待所有线程执行完再结束方法
CountDownLatch countDownLatch = new CountDownLatch(size);
for (MediaProcess mediaProcess : mediaProcessList) {
// 将任务加入线程池
executorService.execute(() -> {
try {

// 获取任务锁,幂等性操作,避免重复执行
Long taskId = mediaProcess.getId();
boolean lock = mediaProcessService.startTask(taskId);
String fileId = mediaProcess.getFileId();
if (!lock) {
log.debug("抢占任务失败,任务id: {}", taskId);
return;
}
// 下载MinIO视频
String bucket = mediaProcess.getBucket();
String objectName = mediaProcess.getFilePath();
File file = fileStorageService.downloadFile(bucket, objectName);
if (file == null) {
log.error("下载视频出错,任务id: {}, bucket: {}, objectName: {}", taskId, bucket, objectName);
// 保存任务处理失败结果
mediaProcessSaveService.saveProcessFinishStatus(taskId, ProcessStatus.PROCESS_FAIL.status(),
fileId, null, "下载视频到本地失败");
return;
}
// 视频转码
// 源avi视频的路径
File sourcePath = file.getAbsoluteFile();
File targetFile = null;
try {
targetFile = File.createTempFile("minio", ".mp4");
} catch (IOException e) {
log.error("创建临时文件异常, ", e);
// 保存任务处理失败结果
mediaProcessSaveService.saveProcessFinishStatus(taskId, ProcessStatus.PROCESS_FAIL.status(),
fileId, null, "创建临时文件异常");
return;
}
String targetPath = targetFile.getAbsolutePath();
// 开始转换
String result = "success";
if (!result.equals("success")) {
log.error("视频转码失败, bucket: {}, objectName: {}, 原因: {}", taskId, bucket, result);
// 保存任务处理失败结果
mediaProcessSaveService.saveProcessFinishStatus(taskId, ProcessStatus.PROCESS_FAIL.status(),
fileId, null, result);
return;
}

// 上传到MinIO
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(targetPath);
} catch (FileNotFoundException e) {
log.error("转码后的文件不存在, 文件路径: {}", targetPath);
// 保存任务处理失败结果
mediaProcessSaveService.saveProcessFinishStatus(taskId, ProcessStatus.PROCESS_FAIL.status(),
fileId, null, "转码后的文件不存在");
return;
}
// 开始上传
boolean isUploadMinIO = fileStorageService.uploadVideoFile(objectName, "video/mp4", fileInputStream);
if (!isUploadMinIO) {
log.error("上传mp4到MinIO失败, taskId: {}", taskId);
// 保存任务处理失败结果
mediaProcessSaveService.saveProcessFinishStatus(taskId, ProcessStatus.PROCESS_FAIL.status(),
fileId, null, "上传mp4到MinIO失败");
return;
}
// 保存任务成功结果,mp4文件的url
String url = getFilePathByMd5(fileId, ".mp4");
mediaProcessSaveService.saveProcessFinishStatus(taskId, ProcessStatus.PROCESS_SUCCESS.status(),
fileId, url, null);
} finally {
// 线程任务完成,计算器-1
countDownLatch.countDown();
}
});
}

// 阻塞,最多等待30分钟
countDownLatch.await(30, TimeUnit.MINUTES);
}

/**
* 得到MinIO文件的路径
*
* @param fileMd5 源文件Md5
* @param suffix 文件后缀
* @return 分块存储路径
*/
private String getFilePathByMd5(String fileMd5, String suffix) {
return fileMd5.charAt(0) + "/" + fileMd5.charAt(1) + "/" + fileMd5 + "/" + fileMd5 + suffix;
}
}