使用策略模式设计Elasticsearch数据初始化接口
接口要求:
- 使用策略模式,请求携带不同参数,接口执行不同数据的初始化方法
- 接口只能使用一次,即初始化完成之后,接口无法再调用
策略类
创建策略类,QueryObject 为查询参数,clazz为ES文档类,用于类型转换
EsDataInitStrategy@Getter @Setter static class EsDataInitStrategy { private Function<QueryObject, R<List<Object>>> function; private Class<?> clazz;
public EsDataInitStrategy(Function<QueryObject, R<List<Object>>> function, Class<?> clazz) { this.function = function; this.clazz = clazz; } }
|
查询参数 QueryObject
QueryObject@Getter @Setter @NoArgsConstructor public class QueryObject { private String keyword; private Integer current = 1; private Integer size = 10;
public QueryObject(Integer current, Integer size) { this.current = current; this.size = size; } public Integer getOffset() { return (current - 1) * size; } }
|
响应参数 R
R@Setter @Getter @NoArgsConstructor public class R<T> { public static final int CODE_SUCCESS = 200; public static final String MSG_SUCCESS = "操作成功"; public static final int CODE_NOLOGIN = 401; public static final String MSG_NOLOGIN = "请先登陆"; public static final int CODE_ERROR = 500; public static final int CODE_REGISTER_ERROR = 500; public static final String MSG_ERROR = "系统异常,请联系管理员"; public static final int CODE_ERROR_PARAM = 501;
private int code; private String msg; private T data;
public R(int code, String msg, T data) { this.code = code; this.msg = msg; this.data = data; }
public static <T> R<T> ok(T data) { return new R<>(CODE_SUCCESS, MSG_SUCCESS, data); }
public static <T> R<T> ok() { return new R<>(CODE_SUCCESS, MSG_SUCCESS, null); }
public static <T> R<T> error(int code, String msg, T data) { return new R<>(code, msg, data); }
public static <T> R<T> error(int code, String msg) { return new R<>(code, msg, null); }
public static <T> R<T> defaultError() { return new R<>(CODE_ERROR, MSG_ERROR, null); }
public static <T> R<T> noLogin() { return new R<>(CODE_NOLOGIN, MSG_NOLOGIN, null); }
public static <T> R<T> noPermission() { return new R<>(403, "非法访问", null); } public T checkAndGet() { if (this.code != CODE_SUCCESS) { throw new BizException(code, msg); } return data; } }
|
远程调用
初始化接口所在微服务可能与需要初始化的数据不在不同,所用数据库不同。为此需要使用Feign远程调用获取数据
用户服务
以获取用户信息为案例,远程调用用户微服务user-service
,分页拉取用户数据
注意,因为使用策略模式,这里的返回结果为Object,也因此 Feign 会将返回的 JSON 结构转换为 LinkedHashMap 对象
UserInfoFeignService@FeignClient("user-service") public interface UserInfoFeignService {
@GetMapping("/users/") R<List<Object>> findList(@RequestParam Integer current, @RequestParam Integer limit); }
|
文章微服务
以获取文章信息为案例,远程调用文章微服务user-service
,分页拉取文章数据
使用Post请求,同时添加 @RequestBody 注解,QueryObject 中包含分页参数。
@FeignClient("article-service") public interface ArticleFeignService {
@PostMapping("/articles/search") public R<List<Object>> articleSearch(@RequestBody QueryObject qo); }
|
Redis服务
接口被设计为只能访问一次,如果项目中使用了Redis,可以将该状态保存到Redis中,同时保证接口幂等性,即并发访问初始化接口,也只能初始化一次。
RedisService@Component public class RedisService { @Autowired public RedisTemplate redisTemplate; public Boolean setnx(String key, String value) { return redisTemplate.opsForValue().setIfAbsent(key, value); } }
|
为了防止接口滥用,可以在配置文件中配置一个随机接口地址:
es: init: key: hg345f67gfdh5yth34g56
|
Elasticsearch服务
直接使用 ElasticsearchRestTemplate 的批量保存方法
ElasticsearchServicepublic interface ElasticsearchService {
public void save(Iterable<?> iterable); }
|
实现方法
ElasticsearchServiceImpl@Service public class ElasticsearchServiceImpl implements ElasticsearchService { @Autowired private ElasticsearchRestTemplate template; @Override public void save(Iterable<?> iterable) { template.save(iterable); } }
|
接口设计
@Slf4j @RestController @RequestMapping("/init") @RefreshScope public class ElasticsearchDataInitController {
public static final String INIT_USER = "user"; public static final String INIT_ARTICLE = "article"; public static final Integer BATCH_COUNT = 200; private final Map<String, EsDataInitStrategy> DATA_HANDLER_STRATEGY_MAP = new HashMap<>();
@Value("${es.init.key}") private String initKey; private final RedisService redisService; private final UserInfoFeignService userInfoFeignService; private final ArticleFeignService articleFeignService; private final ElasticsearchService elasticsearchService;
public ElasticsearchDataInitController(RedisService redisService, UserInfoFeignService userInfoFeignService, ArticleFeignService articleFeignService, ElasticsearchService elasticsearchService) { this.redisService = redisService; this.userInfoFeignService = userInfoFeignService; this.articleFeignService = articleFeignService; this.elasticsearchService = elasticsearchService; }
@PostConstruct public void postConstruct() { EsDataInitStrategy userInit = new EsDataInitStrategy((qo) -> userInfoFeignService.findList(qo.getCurrent(), qo.getSize()), UserInfoEs.class); DATA_HANDLER_STRATEGY_MAP.put(INIT_USER, userInit); EsDataInitStrategy articleInit = new EsDataInitStrategy(articleFeignService::articleSearch, ArticleEs.class); DATA_HANDLER_STRATEGY_MAP.put(INIT_ARTICLE, articleInit); }
@GetMapping("/{key}/{type}") public ResponseEntity<?> init(@PathVariable("key") String key, @PathVariable("type") String type) { log.info("[ES 数据初始化] -------------------- 数据初始化开始 --------------------"); if (StringUtils.isEmpty(key) || !initKey.equals(key)) { log.warn("[ES 数据初始化] 非法操作,请求参数有误 key={}, type={}, initKey={}", key, type, initKey); return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } String redisKey = "es:init:" + key + type; Boolean ret = redisService.setnx(redisKey, "initialized"); if (ret == null || !ret) { log.warn("[ES 数据初始化] 非法操作,已初始化过, redisKey={}, ret={}", redisKey, ret); return ResponseEntity.status(HttpStatus.NOT_FOUND).build(); }
this.doInit(type); log.info("[ES 数据初始化] -------------------- 数据初始化完成 --------------------"); return ResponseEntity.ok().body("init success"); }
private void doInit(String type) { int current = 1; do { List<Object> list = handleRemoteDataList(current++, type); if (list == null || list.isEmpty()) { log.info("[ES 数据初始化] 数据初始化完成."); return; } elasticsearchService.save(list); } while (true);
}
private List<Object> handleRemoteDataList(Integer current, String type) { EsDataInitStrategy strategy = DATA_HANDLER_STRATEGY_MAP.get(type); if (strategy == null) { throw new BizException("初始化参数类型错误"); } R<List<Object>> ret = strategy.getFunction().apply(new QueryObject(current, BATCH_COUNT)); log.info("[ES 数据初始化] 初始化开始,查询{}数据 data={}", type, JSON.toJSONString(ret)); List<Object> list = ret.checkAndGet(); if (list == null || list.isEmpty()) { return list; } List<Object> dataList = new ArrayList<>(list.size()); try { Class<?> clazz = strategy.getClazz(); for (Object dto : list) { Object es = clazz.getDeclaredConstructor().newInstance(); BeanUtils.copyProperties(es, dto); dataList.add(es); } return dataList; } catch (Exception e) { e.printStackTrace(); } return null; } }
|
食用教程
初始化用户数据
curl --request GET \ --url http://localhost:9000/search/init/hg345f67gfdh5yth34g56/user
|
初始化文章数据
curl --request GET \ --url http://localhost:9000/search/init/hg345f67gfdh5yth34g56/article
|