ElasticsearchRestTemplate 的一些坑
基于日期的索引
预期单一索引增长速率较快,最终决定创建基于日期的索引(time-based indices),例如按月份分索引:
@Document(indexName = "my_index-#{@timeBasedIndexNameProvider.toMonth()}", shards = 3)
public class Entity {
// ...
}
其中 indexName 的值可包含 SpEL,引用了 TimeBasedIndexNameProvider 的 toMonth。
@Component("timeBasedIndexNameProvider")
public class TimeBasedIndexNameProvider {
public String toMonth() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM"));
}
}
生成的一系列索引名形式如下:
- my_index-2021-07
- my_index-2021-08
- ……
保存实体时,当前月份的索引可能还未创建,如果直接使用 ElasticsearchRestTemplate 的 save 方法,当前版本并不会解析实体类的实例字段上标注的 Elasticsearch 相关注解,例如有一个字段(batchId):
@Field(type = FieldType.Keyword)
private String batchId;
这种情况下,自动创建的 my_index-* 的 Mapping 所包含的 batchId 类型是 Text,而不是预期的 Keyword。一般来说,在保存实体之前,先检测当前月份的索引是否存在,若不存在,则创建索引(包括 Mapping),否则直接保存实体。
void createIndexAndPutMapping(Class<T> entityClass) {
IndexOperations ops = elasticsearchRestTemplate.indexOps(entityClass);
if (ops.exists()) {
return;
}
ops.create();
Document mapping = ops.createMapping(entityClass);
ops.putMapping(mapping);
}
上面的 createIndexAndPutMapping 线程不安全,多线程并发执行该方法时,不仅可能会看到下面的错误信息,而且 my_index-* 的 Mapping 也可能不符合预期。
mapper [batchId] of different type, current_type [text], merged_type [keyword]
既希望各个线程保存实体互不干扰,又不希望同步机制所引起的延迟,那么最好的方案是使用索引模板。
boolean putTemplate(Class<T> entityClass, String templateName, String... indexPatterns) {
IndexOperations ops = elasticsearchRestTemplate.indexOps(entityClass);
PutTemplateRequest request = PutTemplateRequest.builder(templateName, indexPatterns)
.withSettings(Document.from(ops.createSettings()))
.withMappings(Document.from(ops.createMapping()))
.build();
return ops.putTemplate(request);
}
构建模板时,指定索引的模式(indexPatterns),这里就是 my_index-*,之后所有名称满足该模式的索引创建时,它们的 Mapping 完全一致。注意,以上方法无需同步机制,但是要求单线程执行,例如在程序启动时由单线程执行 createIndexAndMapping 方法,再由多线程执行 ElasticsearchRestTemplate 的 save 方法。
实体内容太长
在日志中看到了这样的错误信息:
entity content is too long [499093206] for the configured buffer limit [104857600]
从 HttpAsyncResponseConsumerFactory 可以知道 Elasticsearch 客户端接收响应的缓冲区大小是 100 MB(104857600 bytes),虽然是硬编码,但是已经足够大到覆盖绝大多数场景,不如调查一下是否是业务逻辑代码的缺陷。
有可能是因为先过滤后聚合的查询没有限制记录条数,可以考虑设置 withPageable。
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(queryBuilder)
.withPageable(PageRequest.of(0, 1))
.addAggregation(aggregationBuilder)
.withIndicesOptions(IndicesOptions.lenientExpandOpen())
.build();
分组和排序以及 Top-N
例如,一个先过滤后聚合的查询:
- 按某一字段过滤。
- 按某一字段分组。
- 组内按某一字段排序。
- 每组只取前几个。
- 各组按某一字段排序。
Query DSL 如下所示:
{
"query": {
"bool": {
"must": [
{
"term": {
"eventId": {
"value": "AXso-TZMPxhCtoMLAWoN",
"boost": 1.0
}
}
}
],
"adjust_pure_negative": true,
"boost": 1.0
}
},
"aggs": {
"group-by-batchId": {
"terms": {
"field": "batchId",
"size": 10,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"min-createTime": "asc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"min-createTime": {
"min": {
"field": "createTime"
}
},
"topN": {
"top_hits": {
"from": 0,
"size": 1,
"version": false,
"seq_no_primary_term": false,
"explain": false,
"sort": [
{
"actionStartTime": {
"order": "asc"
}
}
]
}
}
}
}
}
}
这里关键在于分组之后,在每一组求用于各组排序的字段的最小值或最大值。
MinAggregationBuilder minAggBuilder = AggregationBuilders
.min(minAggName)
.field(orderByField);
termsAggBuilder.order(BucketOrder.aggregation(minAggName, orderByFieldAsc));
termsAggBuilder.subAggregation(minAggBuilder);
字段膨胀
Elasticsearch 限制客户端将过多键值对插入索引。
/**
* nodeId to taskId
*/
private Map<String, String> nodeId2TaskId;
上面是 Object 或者 Nested 的字段,随着插入次数增多,可能遇到异常:
Limit of total fields [1000] in index has been exceeded
如果不需要检索,那么优先考虑将字段类型设置为 Flattened。
@Field(type = FieldType.Flattened)
private Map<String, String> nodeId2TaskId;
本文首发于 https://h2cone.github.io/