断点续传

通常视频文件都比较大,所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制,但是客户的网络环境质量、电脑硬件环境等参差不齐,如果一个大文件快上传完了网断了没有上传完成,需要客户重新上传,用户体验非常差,所以对于大文件上传的要求最基本的是断点续传。

什么是断点续传:

引用百度百科:断点续传指的是在下载或上传时,将下载或上传任务(一个文件或一个压縮包)人为的划分为几个部分,每一个部分采用一个线程进行上传或下载,如果碰到网络故障,可以从已经上传或下载的部分开始继续上传下载末完成的部分,而没有必要从头开始年传下载,断点续传可以提高节省操作时间,提高用户体验性。

业务流程

1、前端会发起checkFile请求,后端检查文件是否在数据库和MinIO中,如果不在直接返回false,在数据库中但不在MinIO中也返回false。ID为文件MD5。

2、文件不存在,前端将文件分块;分块上传前会再次检查分片是否在MinIO中,如果不存在返回false

3、分块不存在,前端发起分块上传请求,后端将文件转存至MinIO中;分块存在,前端继续重复2-3步骤,直至所有分块上传完成。

4、分块上传完成后,前端发起合并文件请求,后端向MinIO发起合并请求;合并完成后,将数据保存到数据库中。

5、后端向MinIO中发起删除分块请求。

接口信息

文件上传前的检查文件

路径地址 http://localhost:63040/media/upload/checkfile
请求方式 POST
请求参数 fileMd5
返回结果 Boolean

分块文件上传前的检查文件

路径地址 http://localhost:63040/media/upload/checkchunk
请求方式 POST
请求参数 fileMd5、chunk
返回结果 Boolean

上传分块文件

路径地址 http://localhost:63040/media/upload/uploadchunk
请求方式 POST
请求参数 MultipartFile、fileMd5、chunk
返回结果 Boolean

合并分块文件

路径地址 http://localhost:63040/media/upload/uploadchunk
请求方式 POST
请求参数 fileName、fileMd5、chunkTotal
返回结果 R

定义Service

文件存储操作

定义上传媒体文件的方法,找到com.swx.media.service包下的FileStorageService接口,定义如下方法:

FileStorageService
/**
* 上传视频分块文件
*
* @param path 文件路径
* @param inputStream 文件流
* @return 文件全路径
*/
public String uploadChunkFile(String path, String mimeType, InputStream inputStream);

/**
* 合并文件
*
* @param folder 分片目录路径
* @param filepath 合并文件路径
* @param chunkSize 分片数量
*/
public void mergeFile(String folder, String filepath, int chunkSize) throws Exception;

/**
* 获取文件信息
*
* @param bucket 桶
* @param filepath 文件路径
* @return 文件信息
*/
public ObjectStat getObjectStat(String bucket, String filepath);

/**
* 清理分块文件
*
* @param chunkFolder 分块目录
* @param chunkTotal 分块数量
*/
void clearChunkFiles(String chunkFolder, int chunkTotal);

实现该方法

MinIOFileStorageService
private final MinioClient minioClient;
private final MinIOConfigProperties minIOConfigProperties;
private final static String separator = "/";

public MinIOFileStorageService(MinioClient minioClient, MinIOConfigProperties minIOConfigProperties) {
this.minioClient = minioClient;
this.minIOConfigProperties = minIOConfigProperties;
}
/**
* 上传视频分块文件
*
* @param path 文件名
* @param inputStream 文件流
* @return 文件全路径
*/
@Override
public String uploadChunkFile(String path, String mimeType, InputStream inputStream) {
try {
PutObjectArgs putObjectArgs = PutObjectArgs.builder()
.object(path)
.bucket(minIOConfigProperties.getBucket().get("video"))
.stream(inputStream, inputStream.available(), -1)
.contentType(mimeType)
.build();
minioClient.putObject(putObjectArgs);
return path;
} catch (Exception ex) {
log.error("minio put file error.", ex);
throw new RuntimeException("上传文件失败");
}
}

/**
* 合并文件
*
* @param folder 分片目录路径
* @param filepath 合并文件路径
* @param chunkSize 分片数量
* @return bucket
*/
@Override
public void mergeFile(String folder, String filepath, int chunkSize) throws Exception {
try {
String bucket = minIOConfigProperties.getBucket().get("video");
// 源文件
List<ComposeSource> sources = Stream.iterate(0, i -> ++i).limit(chunkSize)
.map(i -> ComposeSource.builder().bucket(bucket).object(folder + i).build()).collect(Collectors.toList());
// 合并
ComposeObjectArgs composeObjectArgs = ComposeObjectArgs.builder()
.object(filepath)
.bucket(bucket)
.sources(sources)
.build();
minioClient.composeObject(composeObjectArgs);
} catch (Exception ex) {
log.error("minio compose file error.", ex);
throw new Exception("合并文件失败");
}
}

/**
* 获取文件信息
*
* @param bucket 桶
* @param filepath 文件路径
* @return 文件信息
*/
@Override
public ObjectStat getObjectStat(String bucket, String filepath) {
try {
return minioClient.statObject(
StatObjectArgs.builder().bucket("video").object(filepath).build());
} catch (Exception e) {
log.error("文件信息获取失败, 桶: {}, 文件路径: {}", bucket, filepath, e);
return null;
}
}

/**
* 清理分块文件
*
* @param chunkFolder 分块目录
* @param chunkTotal 分块数量
*/
@Override
@Async
public void clearChunkFiles(String chunkFolder, int chunkTotal) {
// 构建清理item
List<DeleteObject> objects = Stream.iterate(0, i -> ++i).limit(chunkTotal)
.map(i -> new DeleteObject(chunkFolder.concat(Integer.toString(i)))).collect(Collectors.toList());

RemoveObjectsArgs removeObjectsArgs = RemoveObjectsArgs.builder()
.bucket(minIOConfigProperties.getBucket().get("video"))
.objects(objects)
.build();
// 清理文件
Iterable<Result<DeleteError>> results = minioClient.removeObjects(removeObjectsArgs);
// 需要迭代返回的可迭代来执行删除。
for (Result<DeleteError> result : results) {
DeleteError error = null;
try {
error = result.get();
log.info("Error in deleting object {}; {}", error.objectName(), error.message());
} catch (Exception e) {
log.error("清理文件出错", e);
}

}
}

文件上传方法

定义上述方法,找到com.swx.media.service包下的MediaFilesService接口,定义如下方法:

MediaFilesService
/**
* 检查文件是否存在
*
* @param fileMd5 文件md5值
* @return false不存在,true存在
*/
Boolean checkFile(String fileMd5);

/**
* 检查文件分块是否存在
*
* @param fileMd5 文件md5
* @param chunk 分块序号
* @return false不存在,true存在
*/
Boolean checkChunk(String fileMd5, int chunk);

/**
* 上传分块
*
* @param file 文件信息
* @param fileMd5 文件md5
* @param chunk 分块序号
*/
Boolean uploadChunk(MultipartFile file, String fileMd5, int chunk);

/**
* 合并分块
*
* @param companyId 机构ID
* @param fileMd5 文件md5
* @param chunkTotal 分块数量
* @param dto 文件信息
*/
R mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamDTO dto);

实现该方法

MediaFilesServiceImpl
private final FileStorageService fileStorageService;
private final TransactionTemplate transactionTemplate;

private final MinIOConfigProperties minIOConfigProperties;

public MediaFilesServiceImpl(FileStorageService fileStorageService, TransactionTemplate transactionTemplate, MinIOConfigProperties minIOConfigProperties) {
this.fileStorageService = fileStorageService;
this.transactionTemplate = transactionTemplate;
this.minIOConfigProperties = minIOConfigProperties;
}

/**
* 检查文件是否存在
*
* @param fileMd5 文件md5值
* @return false不存在,true存在
*/
@Override
public Boolean checkFile(String fileMd5) {
MediaFiles dbMediaFile = getById(fileMd5);
if (dbMediaFile != null) {
// 文件存在数据库,查询MinIO
ObjectStat objectStat = fileStorageService.getObjectStat(dbMediaFile.getBucket(), dbMediaFile.getFilePath());
return objectStat != null;
}
return false;
}

/**
* 检查文件分块是否存在
*
* @param fileMd5 文件md5
* @param chunk 分块序号
* @return false不存在,true存在
*/
@Override
public Boolean checkChunk(String fileMd5, int chunk) {
// 分块存储路径:md5前两位为两个目录,chunk存储分块文件
String path = getChunkFileFolderPath(fileMd5) + chunk;
String bucket = minIOConfigProperties.getBucket().get("video");
ObjectStat objectStat = fileStorageService.getObjectStat(bucket, path);
return objectStat != null;
}

/**
* 上传分块
*
* @param file 文件信息
* @param fileMd5 文件md5
* @param chunk 分块序号
*/
@Override
public Boolean uploadChunk(MultipartFile file, String fileMd5, int chunk) {
try {
String mineType = getMineType(null);
String chunkFilePath = getChunkFileFolderPath(fileMd5) + chunk;
fileStorageService.uploadChunkFile(chunkFilePath, mineType, file.getInputStream());
return true;
} catch (IOException e) {
log.error("分块文件上传失败,文件ID:{},分块序号: {}", fileMd5, chunk, e);
return false;
}
}

/**
* 合并分块
*
* @param companyId 机构ID
* @param fileMd5 文件md5
* @param chunkTotal 分块数量
* @param dto 文件信息
*/
@Override
public R mergeChunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamDTO dto) {
// 找到分块文件调用minio的sdk进行文件合并
String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
String suffix = dto.getFilename().substring(dto.getFilename().lastIndexOf("."));
String filepath = getFilePathByMd5(fileMd5, suffix);
String bucket = minIOConfigProperties.getBucket().get("video");
try {
// 请求合并操作
fileStorageService.mergeFile(chunkFileFolderPath, filepath, chunkTotal);
} catch (Exception e) {
log.error("文件合并失败,文件名: {}", filepath, e);
return R.fail(false, "文件合并失败");
}

// 获取分片文件
ObjectStat objectStat = fileStorageService.getObjectStat(bucket, filepath);
if (objectStat == null) {
return R.fail(false, "合并文件获取失败");
}
// 设置文件大小
dto.setFileSize(objectStat.length());

// 将文件信息入库
// 保存到数据库,使用编程式事务
MediaFiles mediaFiles = transactionTemplate.execute(transactionStatus -> {
return saveAfterStore(dto, companyId, fileMd5, bucket, filepath);
});
if (mediaFiles == null) {
return R.fail(false, "文件保存失败");
}
// 删除分块文件
fileStorageService.clearChunkFiles(chunkFileFolderPath, chunkTotal);
return R.success(true);
}


private String getMineType(String suffix) {
if (suffix == null) suffix = "";
ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(suffix);
String mimeType = MediaType.APPLICATION_OCTET_STREAM_VALUE;
if (extensionMatch != null) {
mimeType = extensionMatch.getMimeType();
}
return mimeType;
}

/**
* 获得文件Md5值
*
* @param inputStream 文件
* @return Md5值
*/
private String getFileMd5(InputStream inputStream) {
try {
return DigestUtils.md5DigestAsHex(inputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* 得到合并后文件的路径
*
* @param fileMd5 源文件Md5
* @param suffix 文件后缀
* @return 分块存储路径
*/
private String getFilePathByMd5(String fileMd5, String suffix) {
return fileMd5.charAt(0) + "/" + fileMd5.charAt(1) + "/" + fileMd5 + "/" + fileMd5 + suffix;
}

/**
* 分块存储路径:md5前两位为两个目录,chunk存储分块文件
*
* @param fileMd5 源文件Md5
* @return 分块存储路径
*/
private String getChunkFileFolderPath(String fileMd5) {
return fileMd5.charAt(0) + "/" + fileMd5.charAt(1) + "/" + fileMd5 + "/chunk/";
}

定义Controller

MediaFilesUploadController
private final MediaFilesService mediaFilesService;

public MediaFilesUploadController(MediaFilesService mediaFilesService) {
this.mediaFilesService = mediaFilesService;
}

@ApiOperation("文件上传前的检查文件")
@PostMapping("/checkfile")
public Boolean checkFile(@RequestParam("fileMd5") String fileMd5) {
return mediaFilesService.checkFile(fileMd5);
}

@ApiOperation("分块文件上传前的检查文件")
@PostMapping("/checkchunk")
public Boolean checkChunk(@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) {
return mediaFilesService.checkChunk(fileMd5, chunk);
}

@ApiOperation("上传分块文件")
@PostMapping("/uploadchunk")
public Boolean uploadChunk(@RequestParam("file") MultipartFile file,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunk") int chunk) {
return mediaFilesService.uploadChunk(file, fileMd5, chunk);
}

@ApiOperation("合并文件")
@PostMapping("/mergechunks")
public R mergeChunks(@RequestParam("fileName") String fileName,
@RequestParam("fileMd5") String fileMd5,
@RequestParam("chunkTotal") int chunkTotal) {
Long companyId = 1232141425L;
UploadFileParamDTO dto = new UploadFileParamDTO();
dto.setFilename(fileName);
dto.setFileType("001002");
dto.setTags("视频文件");
return mediaFilesService.mergeChunks(companyId, fileMd5, chunkTotal, dto);
}

Push到Git

commit "完成视频上传功能"