将初始化操作设计为一次性接口,需要初始化时,访问接口并传入不同的参数,达到初始化不同数据到Elasticsearch

找到模块:trip-search-service,在包:com.swx.search.controller下创建类:ElasticsearchDataInitController

使用策略模式实现不用type参数,访问不同的方法,完成不同数据的初始化到ES的操作。

为了保证接口的安全,将url参数配置到配置文件中

使用 Redis 保证该接口只能被访问一次,使用 SET NX 保证幂等性。

ElasticsearchDataInitController
@Slf4j
@RestController
@RequestMapping("/init")
@RefreshScope
public class ElasticsearchDataInitController {

public static final String INIT_USER = "user";
public static final String INIT_TRAVEL = "travel";
public static final String INIT_STRATEGY = "strategy";
public static final String INIT_DESTINATION = "destination";
public static final Integer BATCH_COUNT = 200;
private final Map<String, EsDataInitStrategy> DATA_HANDLER_STRATEGY_MAP = new HashMap<>();

@Value("${es.init.key}")
private String initKey;
private final RedisService redisService;
private final UserInfoFeignService userInfoFeignService;
private final ArticleFeignService articleFeignService;
private final ElasticsearchService elasticsearchService;

public ElasticsearchDataInitController(RedisService redisService, UserInfoFeignService userInfoFeignService, ArticleFeignService articleFeignService, ElasticsearchService elasticsearchService) {
this.redisService = redisService;
this.userInfoFeignService = userInfoFeignService;
this.articleFeignService = articleFeignService;
this.elasticsearchService = elasticsearchService;
}

@Getter
@Setter
static class EsDataInitStrategy {
private Function<QueryObject, R<List<Object>>> function;
private Class<?> clazz;

public EsDataInitStrategy(Function<QueryObject, R<List<Object>>> function, Class<?> clazz) {
this.function = function;
this.clazz = clazz;
}
}

@PostConstruct
public void postConstruct() {
// 用户初始化
EsDataInitStrategy userInit = new EsDataInitStrategy((qo) -> userInfoFeignService.findList(qo.getCurrent(), qo.getSize()), UserInfoEs.class);
DATA_HANDLER_STRATEGY_MAP.put(INIT_USER, userInit);
// 攻略文章初始化
EsDataInitStrategy strategyInit = new EsDataInitStrategy(articleFeignService::strategySearch, StrategyEs.class);
DATA_HANDLER_STRATEGY_MAP.put(INIT_STRATEGY, strategyInit);
// 游记文章初始化
EsDataInitStrategy travelInit = new EsDataInitStrategy(articleFeignService::travelSearch, TravelEs.class);
DATA_HANDLER_STRATEGY_MAP.put(INIT_TRAVEL, travelInit);
// 目的地初始化
EsDataInitStrategy destinationInit = new EsDataInitStrategy(articleFeignService::destinationSearch, DestinationEs.class);
DATA_HANDLER_STRATEGY_MAP.put(INIT_DESTINATION, destinationInit);
}

@GetMapping("/{key}/{type}")
public ResponseEntity<?> init(@PathVariable("key") String key, @PathVariable("type") String type) {
log.info("[ES 数据初始化] -------------------- 数据初始化开始 --------------------");
if (StringUtils.isEmpty(key) || !initKey.equals(key)) {
log.warn("[ES 数据初始化] 非法操作,请求参数有误 key={}, type={}, initKey={}", key, type, initKey);
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
}
// 用户访问过,就不允许再访问了
String redisKey = "es:init:" + key + type;
Boolean ret = redisService.setnx(redisKey, "initialized");
if (ret == null || !ret) {
log.warn("[ES 数据初始化] 非法操作,已初始化过, redisKey={}, ret={}", redisKey, ret);
return ResponseEntity.status(HttpStatus.NOT_FOUND).build();
}

// 开始初始化数据
this.doInit(type);
log.info("[ES 数据初始化] -------------------- 数据初始化完成 --------------------");
return ResponseEntity.ok().body("init success");
}

private void doInit(String type) {
int current = 1;
do {
List<Object> list = handleRemoteDataList(current++, type);
if (list == null || list.isEmpty()) {
log.info("[ES 数据初始化] 数据初始化完成.");
return;
}
elasticsearchService.save(list);
} while (true);

}

/**
* 获取并处理远程数据
*/
private List<Object> handleRemoteDataList(Integer current, String type) {
EsDataInitStrategy strategy = DATA_HANDLER_STRATEGY_MAP.get(type);
if (strategy == null) {
throw new BizException("初始化参数类型错误");
}
R<List<Object>> ret = strategy.getFunction().apply(new QueryObject(current, BATCH_COUNT));
log.info("[ES 数据初始化] 初始化开始,查询{}数据 data={}", type, JSON.toJSONString(ret));
List<Object> list = ret.checkAndGet();
if (list == null || list.isEmpty()) {
return list;
}
List<Object> dataList = new ArrayList<>(list.size());
try {
Class<?> clazz = strategy.getClazz();
for (Object dto : list) {
Object es = clazz.getDeclaredConstructor().newInstance();
BeanUtils.copyProperties(es, dto);
dataList.add(es);
}
return dataList;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}

Nacos配置

在 Nacos 配置中心中找到 search-service-dev.yaml,添加如下配置

es:
init:
key: hg345f67gfdh5yth34g56

远程调用

创建包:com.swx.search.feign,创建远程调用 文章微服务 的接口: ArticleFeignService

找到 trip-article-server 模块,在 DestinationController 添加查询方法。

DestinationController
@PostMapping("/search")
public R<List<Destination>> forSearchService(@RequestBody QueryObject qo) {
return R.ok(destinationService.list(Wrappers.<Destination>query().last("limit " + qo.getOffset() + ", " + qo.getSize())));
}

找到 trip-article-server 模块,在 StrategyController 添加查询方法。

StrategyController
@PostMapping("/search")
public R<List<Strategy>> forSearchService(@RequestBody QueryObject qo) {
return R.ok(strategyService.list(Wrappers.<Strategy>query().last("limit " + qo.getOffset() + ", " + qo.getSize())));
}

找到 trip-article-server 模块,在 TravelController 添加查询方法。

TravelController
@PostMapping("/search")
public R<List<Travel>> forSearchService(@RequestBody QueryObject qo) {
return R.ok(travelService.list(Wrappers.<Travel>query().last("limit " + qo.getOffset() + ", " + qo.getSize())));
}
@FeignClient("article-service")
public interface ArticleFeignService {

@GetMapping("/destinations/search")
public R<List<Object>> destinationSearch(@RequestBody QueryObject qo);

@PostMapping("/strategies/search")
public R<List<Object>> strategySearch(@RequestBody QueryObject qo);

@PostMapping("/travels/search")
public R<List<Object>> travelSearch(@RequestBody QueryObject qo);
}

创建远程调用 用户微服务 的接口: UserInfoFeignService

找到 trip-user-server 模块,在 UserInfoController 添加查询方法。

UserInfoController
>@GetMapping
>public R<List<UserInfoDTO>> findList(Integer current, Integer limit) {
int offset = (current - 1) * limit;
List<UserInfo> list = userInfoService.list(Wrappers.<UserInfo>query().last("limit " + offset + ", " + limit));
List<UserInfoDTO> dtoList = list.stream().map(UserInfo::toDto).collect(Collectors.toList());
return R.ok(dtoList);
>}
UserInfoFeignService
@FeignClient("user-service")
public interface UserInfoFeignService {
/**
* 如果 Feign 发起远程调用后,接收的类型没有明确具体类型是什么
* Feign 会将返回的 JSON 结构转换为 LinkedHashMap 对象
*/
@GetMapping("/users/")
R<List<Object>> findList(@RequestParam Integer current, @RequestParam Integer limit);
}

远程调用时接收的类型为Object时,Feign 会将返回的 JSON 结构转换为 LinkedHashMap 对象,在进行属性拷贝时使用了 BeanUtils,但是org.springframework.beans.BeanUtils不支持map到对象的拷贝

for (Object dto : list) {
Object es = clazz.getDeclaredConstructor().newInstance();
BeanUtils.copyProperties(es, dto);
dataList.add(es);
}

这里使用 apache 提供的 BeanUtils

<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>

其在拷贝时做了判断

if (orig instanceof DynaBean) {
....;
} else if (orig instanceof Map) {
....;
} else {
....;
}

现在访问如下接口,完成数据的初始化操作:建立索引,导入数据

GET
http://localhost:9000/search/init/hg345f67gfdh5yth34g56/destination
http://localhost:9000/search/init/hg345f67gfdh5yth34g56/user
http://localhost:9000/search/init/hg345f67gfdh5yth34g56/travel
http://localhost:9000/search/init/hg345f67gfdh5yth34g56/strategy