ElasticsearchRestTemplate 的一些坑

Posted on Jul 26, 2021

基于日期的索引

预期单一索引增长速率较快,最终决定创建基于日期的索引(time-based indices),例如按月份分索引:

@Document(indexName = "my_index-#{@timeBasedIndexNameProvider.toMonth()}", shards = 3)
public class Entity {
    // ...
}

其中 indexName 的值可包含 SpEL,引用了 TimeBasedIndexNameProvider 的 toMonth。

@Component("timeBasedIndexNameProvider")
public class TimeBasedIndexNameProvider {

    public String toMonth() {
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM"));
    }
}

生成的一系列索引名形式如下:

  • my_index-2021-07
  • my_index-2021-08
  • ……

保存实体时,当前月份的索引可能还未创建,如果直接使用 ElasticsearchRestTemplate 的 save 方法,当前版本并不会解析实体类的实例字段上标注的 Elasticsearch 相关注解,例如有一个字段(batchId):

@Field(type = FieldType.Keyword)
private String batchId;

这种情况下,自动创建的 my_index-* 的 Mapping 所包含的 batchId 类型是 Text,而不是预期的 Keyword。一般来说,在保存实体之前,先检测当前月份的索引是否存在,若不存在,则创建索引(包括 Mapping),否则直接保存实体。

void createIndexAndPutMapping(Class<T> entityClass) {
    IndexOperations ops = elasticsearchRestTemplate.indexOps(entityClass);
    if (ops.exists()) {
        return;
    }
    ops.create();
    Document mapping = ops.createMapping(entityClass);
    ops.putMapping(mapping);
}

上面的 createIndexAndPutMapping 线程不安全,多线程并发执行该方法时,不仅可能会看到下面的错误信息,而且 my_index-* 的 Mapping 也可能不符合预期。

mapper [batchId] of different type, current_type [text], merged_type [keyword]

既希望各个线程保存实体互不干扰,又不希望同步机制所引起的延迟,那么最好的方案是使用索引模板

boolean putTemplate(Class<T> entityClass, String templateName, String... indexPatterns) {
    IndexOperations ops = elasticsearchRestTemplate.indexOps(entityClass);
    PutTemplateRequest request = PutTemplateRequest.builder(templateName, indexPatterns)
            .withSettings(Document.from(ops.createSettings()))
            .withMappings(Document.from(ops.createMapping()))
            .build();
    return ops.putTemplate(request);
}

构建模板时,指定索引的模式(indexPatterns),这里就是 my_index-*,之后所有名称满足该模式的索引创建时,它们的 Mapping 完全一致。注意,以上方法无需同步机制,但是要求单线程执行,例如在程序启动时由单线程执行 createIndexAndMapping 方法,再由多线程执行 ElasticsearchRestTemplate 的 save 方法。

实体内容太长

在日志中看到了这样的错误信息:

entity content is too long [499093206] for the configured buffer limit [104857600]

HttpAsyncResponseConsumerFactory 可以知道 Elasticsearch 客户端接收响应的缓冲区大小是 100 MB(104857600 bytes),虽然是硬编码,但是已经足够大到覆盖绝大多数场景,不如调查一下是否是业务逻辑代码的缺陷。

有可能是因为先过滤后聚合的查询没有限制记录条数,可以考虑设置 withPageable。

NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(queryBuilder)
                .withPageable(PageRequest.of(0, 1))
                .addAggregation(aggregationBuilder)
                .withIndicesOptions(IndicesOptions.lenientExpandOpen())
                .build();

分组和排序以及 Top-N

例如,一个先过滤后聚合的查询:

  1. 按某一字段过滤。
  2. 按某一字段分组。
  3. 组内按某一字段排序。
  4. 每组只取前几个。
  5. 各组按某一字段排序。

Query DSL 如下所示:

{
    "query": {
        "bool": {
            "must": [
                {
                    "term": {
                        "eventId": {
                            "value": "AXso-TZMPxhCtoMLAWoN",
                            "boost": 1.0
                        }
                    }
                }
            ],
            "adjust_pure_negative": true,
            "boost": 1.0
        }
    },
    "aggs": {
        "group-by-batchId": {
            "terms": {
                "field": "batchId",
                "size": 10,
                "min_doc_count": 1,
                "shard_min_doc_count": 0,
                "show_term_doc_count_error": false,
                "order": [
                    {
                        "min-createTime": "asc"
                    },
                    {
                        "_key": "asc"
                    }
                ]
            },
            "aggregations": {
                "min-createTime": {
                    "min": {
                        "field": "createTime"
                    }
                },
                "topN": {
                    "top_hits": {
                        "from": 0,
                        "size": 1,
                        "version": false,
                        "seq_no_primary_term": false,
                        "explain": false,
                        "sort": [
                            {
                                "actionStartTime": {
                                    "order": "asc"
                                }
                            }
                        ]
                    }
                }
            }
        }
    }
}

这里关键在于分组之后,在每一组求用于各组排序的字段的最小值或最大值。

MinAggregationBuilder minAggBuilder = AggregationBuilders
        .min(minAggName)
        .field(orderByField);
termsAggBuilder.order(BucketOrder.aggregation(minAggName, orderByFieldAsc));
termsAggBuilder.subAggregation(minAggBuilder);

字段膨胀

Elasticsearch 限制客户端将过多键值对插入索引。

/**
* nodeId to taskId
*/
private Map<String, String> nodeId2TaskId;

上面是 Object 或者 Nested 的字段,随着插入次数增多,可能遇到异常:

Limit of total fields [1000] in index has been exceeded

如果不需要检索,那么优先考虑将字段类型设置为 Flattened。

@Field(type = FieldType.Flattened)
private Map<String, String> nodeId2TaskId;

本文首发于 https://h2cone.github.io/

参考资料