将初始化操作设计为一次性接口,需要初始化时,访问接口并传入不同的参数,达到初始化不同数据到Elasticsearch
找到模块:trip-search-service,在包:com.swx.search.controller
下创建类:ElasticsearchDataInitController
使用策略模式 实现不用type参数,访问不同的方法,完成不同数据的初始化到ES的操作。
为了保证接口的安全,将url参数配置到配置文件中
使用 Redis 保证该接口只能被访问一次,使用 SET NX 保证幂等性。
ElasticsearchDataInitController @Slf4j @RestController @RequestMapping("/init") @RefreshScope public class ElasticsearchDataInitController { public static final String INIT_USER = "user" ; public static final String INIT_TRAVEL = "travel" ; public static final String INIT_STRATEGY = "strategy" ; public static final String INIT_DESTINATION = "destination" ; public static final Integer BATCH_COUNT = 200 ; private final Map<String, EsDataInitStrategy> DATA_HANDLER_STRATEGY_MAP = new HashMap <>(); @Value("${es.init.key}") private String initKey; private final RedisService redisService; private final UserInfoFeignService userInfoFeignService; private final ArticleFeignService articleFeignService; private final ElasticsearchService elasticsearchService; public ElasticsearchDataInitController (RedisService redisService, UserInfoFeignService userInfoFeignService, ArticleFeignService articleFeignService, ElasticsearchService elasticsearchService) { this .redisService = redisService; this .userInfoFeignService = userInfoFeignService; this .articleFeignService = articleFeignService; this .elasticsearchService = elasticsearchService; } @Getter @Setter static class EsDataInitStrategy { private Function<QueryObject, R<List<Object>>> function; private Class<?> clazz; public EsDataInitStrategy (Function<QueryObject, R<List<Object>>> function, Class<?> clazz) { this .function = function; this .clazz = clazz; } } @PostConstruct public void postConstruct () { EsDataInitStrategy userInit = new EsDataInitStrategy ((qo) -> userInfoFeignService.findList(qo.getCurrent(), qo.getSize()), UserInfoEs.class); DATA_HANDLER_STRATEGY_MAP.put(INIT_USER, userInit); EsDataInitStrategy strategyInit = new EsDataInitStrategy (articleFeignService::strategySearch, StrategyEs.class); DATA_HANDLER_STRATEGY_MAP.put(INIT_STRATEGY, strategyInit); EsDataInitStrategy travelInit = new EsDataInitStrategy (articleFeignService::travelSearch, TravelEs.class); DATA_HANDLER_STRATEGY_MAP.put(INIT_TRAVEL, travelInit); EsDataInitStrategy destinationInit = new EsDataInitStrategy (articleFeignService::destinationSearch, DestinationEs.class); DATA_HANDLER_STRATEGY_MAP.put(INIT_DESTINATION, destinationInit); } @GetMapping("/{key}/{type}") public ResponseEntity<?> init(@PathVariable("key") String key, @PathVariable("type") String type) { log.info("[ES 数据初始化] -------------------- 数据初始化开始 --------------------" ); if (StringUtils.isEmpty(key) || !initKey.equals(key)) { log.warn("[ES 数据初始化] 非法操作,请求参数有误 key={}, type={}, initKey={}" , key, type, initKey); return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } String redisKey = "es:init:" + key + type; Boolean ret = redisService.setnx(redisKey, "initialized" ); if (ret == null || !ret) { log.warn("[ES 数据初始化] 非法操作,已初始化过, redisKey={}, ret={}" , redisKey, ret); return ResponseEntity.status(HttpStatus.NOT_FOUND).build(); } this .doInit(type); log.info("[ES 数据初始化] -------------------- 数据初始化完成 --------------------" ); return ResponseEntity.ok().body("init success" ); } private void doInit (String type) { int current = 1 ; do { List<Object> list = handleRemoteDataList(current++, type); if (list == null || list.isEmpty()) { log.info("[ES 数据初始化] 数据初始化完成." ); return ; } elasticsearchService.save(list); } while (true ); } private List<Object> handleRemoteDataList (Integer current, String type) { EsDataInitStrategy strategy = DATA_HANDLER_STRATEGY_MAP.get(type); if (strategy == null ) { throw new BizException ("初始化参数类型错误" ); } R<List<Object>> ret = strategy.getFunction().apply(new QueryObject (current, BATCH_COUNT)); log.info("[ES 数据初始化] 初始化开始,查询{}数据 data={}" , type, JSON.toJSONString(ret)); List<Object> list = ret.checkAndGet(); if (list == null || list.isEmpty()) { return list; } List<Object> dataList = new ArrayList <>(list.size()); try { Class<?> clazz = strategy.getClazz(); for (Object dto : list) { Object es = clazz.getDeclaredConstructor().newInstance(); BeanUtils.copyProperties(es, dto); dataList.add(es); } return dataList; } catch (Exception e) { e.printStackTrace(); } return null ; } }
Nacos配置
在 Nacos 配置中心中找到 search-service-dev.yaml,添加如下配置
es: init: key: hg345f67gfdh5yth34g56
远程调用
创建包:com.swx.search.feign
,创建远程调用 文章微服务 的接口: ArticleFeignService
找到 trip-article-server 模块,在 DestinationController 添加查询方法。
DestinationController @PostMapping("/search") public R<List<Destination>> forSearchService (@RequestBody QueryObject qo) { return R.ok(destinationService.list(Wrappers.<Destination>query().last("limit " + qo.getOffset() + ", " + qo.getSize()))); }
找到 trip-article-server 模块,在 StrategyController 添加查询方法。
StrategyController @PostMapping("/search") public R<List<Strategy>> forSearchService (@RequestBody QueryObject qo) { return R.ok(strategyService.list(Wrappers.<Strategy>query().last("limit " + qo.getOffset() + ", " + qo.getSize()))); }
找到 trip-article-server 模块,在 TravelController 添加查询方法。
TravelController @PostMapping("/search") public R<List<Travel>> forSearchService (@RequestBody QueryObject qo) { return R.ok(travelService.list(Wrappers.<Travel>query().last("limit " + qo.getOffset() + ", " + qo.getSize()))); }
@FeignClient("article-service") public interface ArticleFeignService { @GetMapping("/destinations/search") public R<List<Object>> destinationSearch (@RequestBody QueryObject qo) ; @PostMapping("/strategies/search") public R<List<Object>> strategySearch (@RequestBody QueryObject qo) ; @PostMapping("/travels/search") public R<List<Object>> travelSearch (@RequestBody QueryObject qo) ; }
创建远程调用 用户微服务 的接口: UserInfoFeignService
找到 trip-user-server 模块,在 UserInfoController 添加查询方法。
UserInfoController >@GetMapping >public R<List<UserInfoDTO>> findList (Integer current, Integer limit) { int offset = (current - 1 ) * limit; List<UserInfo> list = userInfoService.list(Wrappers.<UserInfo>query().last("limit " + offset + ", " + limit)); List<UserInfoDTO> dtoList = list.stream().map(UserInfo::toDto).collect(Collectors.toList()); return R.ok(dtoList); >}
UserInfoFeignService @FeignClient("user-service") public interface UserInfoFeignService { @GetMapping("/users/") R<List<Object>> findList (@RequestParam Integer current, @RequestParam Integer limit) ; }
远程调用时接收的类型为Object时,Feign 会将返回的 JSON 结构转换为 LinkedHashMap 对象,在进行属性拷贝时使用了 BeanUtils,但是org.springframework.beans.BeanUtils
不支持map到对象的拷贝
for (Object dto : list) { Object es = clazz.getDeclaredConstructor().newInstance(); BeanUtils.copyProperties(es, dto); dataList.add(es); }
这里使用 apache 提供的 BeanUtils
<dependency > <groupId > commons-beanutils</groupId > <artifactId > commons-beanutils</artifactId > <version > 1.9.4</version > </dependency >
其在拷贝时做了判断
if (orig instanceof DynaBean) { ....; } else if (orig instanceof Map) { ....; } else { ....; }
现在访问如下接口,完成数据的初始化操作:建立索引,导入数据
GET http://localhost:9000/search/init/hg345f67gfdh5yth34g56/destination http://localhost:9000/search/init/hg345f67gfdh5yth34g56/user http://localhost:9000/search/init/hg345f67gfdh5yth34g56/travel http://localhost:9000/search/init/hg345f67gfdh5yth34g56/strategy