search_after 遇上高频 refresh,你的分页还稳得住吗?

ElasticSearch 的 search_after API 是深度分页的首选方案,但它有一个鲜为人知的"阿喀琉斯之踵"——在高频 refresh 场景下,排序可能变得不稳定,导致翻页出现重复或遗漏。本文将深入剖析这一问题的根因,并引出 search_after + Point-in-Time (PIT) 的完整解决方案,最后结合 QueryBuilder 的代码实现,梳理从接口设计到底层查询的完整链路。

一、ElasticSearch 的分页困境

1.1 传统 from + size 的天花板

ElasticSearch 最基础的分页方式是 from + size

GET /my_index/_search
{
  "from": 10000,
  "size": 20,
  "query": { "match_all": {} },
  "sort": [{ "created_at": "desc" }]
}

这种方式的问题众所周知——深度分页性能急剧下降。当 from 值很大时,ES 需要在每个分片上取出 from + size 条文档,然后在协调节点上做全局排序后丢弃前 from 条。当 from = 10000, size = 20 时,每个分片实际要返回 10020 条文档给协调节点。

ES 默认将 from + size 的上限设为 10000(index.max_result_window),超过即报错。

1.2 search_after:基于游标的深度分页

search_after API 是 ES 官方推荐的深度分页方案。它的核心思想是:不再使用偏移量,而是使用上一页最后一条文档的排序值作为"游标",直接定位到下一页的起始位置

// 第一页
GET /my_index/_search
{
  "size": 20,
  "query": { "match_all": {} },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ]
}

// 第二页:使用第一页最后一条的 sort values
GET /my_index/_search
{
  "size": 20,
  "query": { "match_all": {} },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ],
  "search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}

优势

  • 无论翻到多深,每次查询的开销都是恒定的(只取 size 条)
  • 不受 max_result_window 限制

但是search_after 有一个关键前提——它假设两次请求之间,索引的数据和排序是稳定的


二、高频 refresh 下的排序不稳定问题

2.1 问题根因

ElasticSearch 是一个近实时(Near Real-Time)搜索引擎。写入的文档不会立即可搜索,而是在 refresh 操作后才对搜索可见。默认的 refresh 间隔是 1 秒(index.refresh_interval)。

当你使用 search_after 进行跨请求分页时,两次请求之间可能发生以下情况:

  1. 新文档被 refresh 进来:如果新文档的排序值落在已翻过的页和当前页之间,会导致后续页的文档整体"后移",出现重复数据
  2. 旧文档被删除或更新:文档排序值变化后,可能导致某些文档被跳过
  3. 段合并(Segment Merge):后台的段合并可能改变文档的物理存储顺序,影响 _doc 等隐式排序

用一张时间线来说明:

请求1(第1页)                    请求2(第2页)
    │                                │
    ▼                                ▼
┌─────────┐    refresh 发生     ┌─────────┐
│ 文档A    │    ──────────►     │ 文档X(新)│  ← 新文档插入
│ 文档B    │                    │ 文档A    │  ← 文档A 重复出现!
│ 文档C    │                    │ 文档B    │
│ ...     │                     │ ...     │
└─────────┘                     └─────────┘

2.2 影响场景

这个问题在以下场景中尤为突出:

场景影响程度说明
数据导出🔴 严重全量遍历时数据重复或遗漏,导致导出结果不完整
后台管理列表翻页🟡 中等用户翻页时看到重复数据,体验差
移动端"加载更多"🟡 中等下拉加载时出现已看过的内容
实时数据流消费🔴 严重消费位点不准确,可能重复处理或丢失数据

2.3 为什么 _id 排序也救不了?

有人可能会想:加一个唯一的 _id 作为 tiebreaker 排序字段不就行了?

确实,_id 可以保证排序的唯一性,但它无法阻止新文档插入到已翻过的区间search_after 的语义是"给我排序值大于这个游标的文档"——如果 refresh 后出现了新的符合条件的文档,它们就会出现在结果中。

根本问题不在排序的唯一性,而在于查询视图的不一致性


三、Point-in-Time (PIT):冻结索引快照

3.1 PIT 是什么?

Point-in-Time (PIT) 是 ES 7.10+ 引入的特性,它允许你创建一个索引的轻量级快照。在 PIT 的生命周期内,所有搜索请求都基于同一个快照执行——即使期间发生了 refresh、文档更新或段合并,查询结果都不会受影响。

                    PIT 创建
                       │
                       ▼
    ┌──────────────────────────────────────┐
    │         索引快照(冻结视图)           │
    │                                      │
    │  请求1 ──► 基于快照查询 ──► 结果1      │
    │  请求2 ──► 基于快照查询 ──► 结果2      │  ← 数据一致!
    │  请求3 ──► 基于快照查询 ──► 结果3      │
    │                                      │
    └──────────────────────────────────────┘
                       │
                    PIT 关闭

3.2 PIT 的工作原理

  1. 创建 PIT:调用 POST /my_index/_pit?keep_alive=1m,ES 返回一个 pit_id
  2. 使用 PIT 查询:在搜索请求中携带 pit_id,此时不需要指定索引名(PIT 已绑定索引)
  3. 续期 PIT:每次查询时通过 keep_alive 参数续期,ES 返回可能更新的 pit_id
  4. 关闭 PIT:遍历完成后主动关闭,释放 ES 资源

3.3 search_after + PIT = 完美分页

search_after 与 PIT 结合,就能实现既高效又一致的深度分页

// 1. 创建 PIT
POST /my_index/_pit?keep_alive=1m
// 返回: { "id": "46ToAwMDaWR5BXV1..." }

// 2. 第一页查询(携带 PIT)
GET /_search
{
  "size": 20,
  "query": { "match_all": {} },
  "pit": {
    "id": "46ToAwMDaWR5BXV1...",
    "keep_alive": "1m"
  },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ]
}

// 3. 第二页查询(search_after + PIT)
GET /_search
{
  "size": 20,
  "query": { "match_all": {} },
  "pit": {
    "id": "46ToAwMDaWR5BXV1...",  // 使用上一次返回的 pit_id
    "keep_alive": "1m"
  },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ],
  "search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}

// 4. 遍历完成后关闭 PIT
DELETE /_pit
{ "id": "46ToAwMDaWR5BXV1..." }

四、QueryBuilder 中的实现:代码链路全梳理

QueryBuilder 是一个支持多数据源的 Go 查询构建器库。在最新版本中,ElasticSearchBuilder 已全面引入 search_after + PIT 方案。下面我们从接口设计到底层执行,逐层梳理整个链路。

4.1 新增的数据结构与字段

首先,ElasticSearchBuilder 新增了 pitKeepAlivepitID 两个字段,分别用于控制 PIT 保活时长和支持跨请求的 PIT 分页:

type ElasticSearchBuilder[R any] struct {
    builder[*ElasticSearchBuilder[R], R]
    index        string
    filter       elastic.Query
    sort         []elastic.Sorter
    pitKeepAlive time.Duration // 新增:Point-in-Time 保持时间
    pitID        string        // 新增:外部传入/内部更新的 PIT ID(用于跨请求分页)
}

同时新增了 ESPITPageResult 结构体,作为 PIT 分页查询的返回结果:

// ESPITPageResult ES PIT 分页查询结果。
// PitID 与 CursorValues 可用于下一批查询。
type ESPITPageResult[R any] struct {
    List         []*R    // 当前页数据
    Total        int64   // 总数(仅首页查询时返回)
    HasMore      bool    // 是否还有下一页
    PitID        string  // 下一页使用的 PIT ID
    CursorValues []any   // 下一页使用的游标值
}

以及新增的错误定义:

// ErrPITCursorWithoutPITID ElasticSearch 单批次分页查询模式下未提供 PIT ID 的错误
ErrPITCursorWithoutPITID = errors.New("PIT ID is required when cursor values are provided")

这个错误用于防御性校验——如果调用方提供了 cursorValues(说明不是第一页),但没有提供 pitID,说明 PIT 会话已丢失,此时应该拒绝查询而非静默返回错误数据。

4.2 新增 API:SetPitKeepAliveSetPITIDQueryPageWithPIT

SetPitKeepAlive:设置 PIT 的保活时长,控制 ES 快照的存活窗口:

func (e *ElasticSearchBuilder[R]) SetPitKeepAlive(keepAlive time.Duration) *ElasticSearchBuilder[R] {
    e.pitKeepAlive = keepAlive
    return e
}

如果不设置,默认使用 1 分钟(由 pitKeepAliveString() 内部兜底)。对于耗时较长的遍历或翻页间隔较大的场景,建议适当调大此值(如 2 * time.Minute),避免 PIT 在两次请求之间过期。

SetPITID:允许外部传入 PIT ID,用于跨请求续查:

func (e *ElasticSearchBuilder[R]) SetPITID(pitID string) *ElasticSearchBuilder[R] {
    e.pitID = pitID
    return e
}

QueryPageWithPIT:核心新增方法,执行基于 PIT + search_after 的单批次分页查询:

func (e *ElasticSearchBuilder[R]) QueryPageWithPIT(ctx context.Context) (*ESPITPageResult[R], error) {
    if err := e.builder.prepareAndValidate(); err != nil {
        return nil, err
    }
    if e.index == "" {
        return nil, errors.New("elasticsearch index not configured")
    }
    // 防御性校验:有游标值但没有 PIT ID,说明会话已丢失
    if len(e.builder.cursorValues) > 0 && e.pitID == "" {
        return nil, ErrPITCursorWithoutPITID
    }

    // 临时覆盖分页标志,确保 QueryMeta 和执行语义一致
    oldNeedPagination := e.builder.needPagination
    oldIsCursorQuery := e.builder.isCursorQuery
    oldPitID := e.pitID
    defer func() {
        e.builder.needPagination = oldNeedPagination
        e.builder.isCursorQuery = oldIsCursorQuery
        e.pitID = oldPitID
    }()

    e.builder.needPagination = true
    e.builder.isCursorQuery = true

    isFirstBatch := len(e.builder.cursorValues) == 0

    var (
        nextCursorValues []any
        hasMore          bool
        resultPitID      string
    )

    // 通过中间件管道执行查询
    list, total, err := e.builder.executeWithMiddlewares(ctx, func(ctx context.Context) ([]*R, int64, error) {
        batchList, batchNextCursorValues, batchTotal, batchHasMore, queryErr := e.doCursorQuery(
            ctx, e.builder.cursorValues, isFirstBatch, true, &e.pitID,
        )
        if queryErr != nil {
            return nil, 0, queryErr
        }
        nextCursorValues = batchNextCursorValues
        hasMore = batchHasMore
        resultPitID = e.pitID
        return batchList, batchTotal, nil
    })
    if err != nil {
        return nil, err
    }

    result := &ESPITPageResult[R]{
        List:         list,
        Total:        total,
        HasMore:      hasMore,
        PitID:        resultPitID,
        CursorValues: nextCursorValues,
    }

    // 最后一页:清空 PIT 信息,提示调用方无需继续
    if !hasMore {
        result.PitID = ""
        result.CursorValues = nil
    }

    return result, nil
}

设计要点

  1. 临时覆盖 + defer 恢复QueryPageWithPIT 临时将 needPaginationisCursorQuery 设为 true,确保 QueryMeta 中的元信息与实际执行语义一致(中间件可以正确感知这是一次游标分页查询),查询完成后通过 defer 恢复原值,不影响构建器的后续使用。
  2. 中间件管道复用:通过 executeWithMiddlewares 执行,意味着缓存、日志、指标等中间件对 PIT 查询同样生效。
  3. 最后一页自动清理:当 hasMorefalse 时,自动清空返回结果中的 PitIDCursorValues,向调用方明确传达"没有下一页了"的语义。

4.3 底层引擎重构:doCursorQuery 的演进

doCursorQuery 是整个游标分页的核心执行引擎。在引入 PIT 之前,它是一个纯粹的 search_after 查询函数,不涉及任何 PIT 管理。这次重构涉及签名变更、PIT 管理策略和 hasMore 判断逻辑三个维度。

签名变更

// 旧版(无 PIT 支持)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
    ctx context.Context, cursorValues []any, isFirstBatch bool,
) ([]*R, []any, int64, error)

// 新版(PIT + hasMore)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
    ctx context.Context, cursorValues []any,
    isFirstBatch, forcePIT bool,
    pitID *string,
) (list []*R, nextCursorValues []any, total int64, hasMore bool, err error)

三个关键变化:

  • 新增 pitID *string 参数:用于在多批次查询间传递和更新 PIT ID
  • 新增 forcePIT 参数:区分"全量遍历自动 PIT"和"跨请求强制 PIT"两种场景
  • 新增 hasMore 返回值:用于 QueryPageWithPIT 判断是否还有下一页

从直连索引到 PIT 快照

旧版 doCursorQuery 直接通过索引名查询,没有任何快照保障:

// 旧版:直接使用 Index 查询,无 PIT 支持
searchService := e.builder.data.ElasticSearch.Search().
    Index(e.index).
    Query(filter).
    Size(batchSize)

新版引入了 forcePITusePIT 的双层控制:

querySize := batchSize
if forcePIT {
    // PIT 分页场景通过多取 1 条记录来判断是否还有下一页
    querySize = batchSize + 1
}

usePIT := forcePIT || !e.builder.needPagination
  • forcePIT = true(来自 QueryPageWithPIT):强制使用 PIT,并多取 1 条用于 hasMore 探测
  • !needPagination(来自 QueryCursor 全量遍历):自动启用 PIT 以保证遍历一致性
  • 两者都为 false 时,退化为旧版行为——直接使用 Index(e.index) 查询

PIT 生命周期管理

usePIT := forcePIT || !e.builder.needPagination
openedPITByThisCall := false

defer func() {
    // 如果本次调用打开了 PIT 但查询失败,主动关闭以避免资源泄漏
    if err != nil && openedPITByThisCall && pitID != nil && *pitID != "" {
        e.closePIT(*pitID)
        *pitID = ""
    }
}()

if usePIT {
    if pitID == nil {
        return nil, nil, 0, false, errors.New("pitID pointer is nil")
    }
    if *pitID == "" {
        // 首次调用:自动打开 PIT
        openResp, err := e.builder.data.ElasticSearch.OpenPointInTime(e.index).
            KeepAlive(e.pitKeepAliveString()).Do(ctx)
        if err != nil {
            return nil, nil, 0, false, err
        }
        *pitID = openResp.Id
        openedPITByThisCall = true
    }
    // 使用 PIT 查询时不指定索引名(PIT 已绑定索引)
    searchService = searchService.PointInTime(
        elastic.NewPointInTimeWithKeepAlive(*pitID, e.pitKeepAliveString()),
    )
} else {
    searchService = searchService.Index(e.index)
}

关键设计

  • openedPITByThisCall 标记用于错误回滚——如果是本次调用打开的 PIT,但后续查询失败了,需要主动关闭以避免 ES 资源泄漏
  • PIT 查询时不指定索引名,因为 PIT 已经绑定了索引

hasMore 探测与结果裁剪

hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize

effectiveHits := hits
if hasMore {
    effectiveHits = hits[:batchSize]  // 裁掉多取的那 1 条
}

list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
    var item R
    if err := json.Unmarshal(hit.Source, &item); err != nil {
        return nil, nil, 0, false, err
    }
    list = append(list, &item)
}

这是一个经典的 "多取一条"探测模式:请求 batchSize + 1 条数据,如果实际返回了 batchSize + 1 条,说明还有下一页;否则当前就是最后一页。返回给调用方时裁掉多取的那一条。

最后一页自动关闭 PIT

if forcePIT && !hasMore && *pitID != "" {
    e.closePIT(*pitID)
    *pitID = ""
}

forcePIT 模式下检测到最后一页时,自动关闭 PIT 释放 ES 资源。这确保了即使调用方忘记手动关闭,PIT 也不会一直占用资源。

4.4 新增 closePIT:PIT 资源释放

旧版 ElasticSearchBuilder 没有 PIT 相关逻辑,自然也没有 closePIT 方法。新版新增了这个方法,负责在 PIT 不再需要时主动释放 ES 资源:

const esPITCloseTimeout = 3 * time.Second

func (e *ElasticSearchBuilder[R]) closePIT(pitID string) {
    if pitID == "" {
        return
    }
    closeCtx, cancel := context.WithTimeout(context.Background(), esPITCloseTimeout)
    defer cancel()
    _, _ = e.builder.data.ElasticSearch.ClosePointInTime(pitID).Do(closeCtx)
}

设计要点

  • 超时时间提取为常量 esPITCloseTimeout,提高可维护性
  • 使用独立的 context.Background() 创建超时上下文,避免受调用方 context 取消的影响
  • 忽略关闭错误(_, _),因为 PIT 关闭失败不应影响业务流程,ES 会在 keep_alive 过期后自动回收
  • closePIT 的调用时机由上层精确控制:QueryCursor 通过 defer 在迭代结束后关闭,doCursorQuery 在最后一页自动关闭,错误时通过 openedPITByThisCall 标记回滚

4.5 QueryCursor 的重构

旧版 QueryCursor 直接将 doCursorQuery 作为回调传入,因为两者签名完全匹配:

// 旧版:签名匹配,直接传入
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
    // ...
    return e.builder.executeCursorWithMiddlewares(ctx, e.doCursorQuery)
}

新版由于 doCursorQuery 签名变更(新增 forcePITpitIDhasMore),需要通过包装函数适配,同时引入 PIT 生命周期管理:

// 新版:包装适配 + PIT 生命周期管理
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
    // ...
    var pitID string
    wrappedCursorQuery := func(ctx context.Context, cursorValues []any, isFirstBatch bool) ([]*R, []any, int64, error) {
        // forcePIT = false:由 needPagination 控制是否启用 PIT
        list, nextCursorValues, total, _, err := e.doCursorQuery(ctx, cursorValues, isFirstBatch, false, &pitID)
        return list, nextCursorValues, total, err
    }
    innerIter := e.builder.executeCursorWithMiddlewares(ctx, wrappedCursorQuery)
    return func(yield func(*R, error) bool) {
        defer func() {
            e.closePIT(pitID)  // 迭代结束后关闭 PIT
        }()
        for item, err := range innerIter {
            if !yield(item, err) {
                return
            }
        }
    }
}

关键变化

  • 通过 wrappedCursorQuery 闭包适配新签名,传入 forcePIT = false 和局部 pitID 变量
  • hasMore 返回值用 _ 丢弃,因为全量遍历模式下,是否继续由"本批次是否返回了数据"决定
  • 新增 defer e.closePIT(pitID),确保迭代结束(正常完成或提前 break)后自动释放 PIT 资源

4.6 结果解析的重构:从回调内解析到统一后处理

旧版 doCursorQuery 在搜索回调内部直接解析 hits 并追加到 list

// 旧版:在搜索回调内部解析
if err := util.WaitAndGo(func() error {
    searchResult, err = searchService.Do(ctx)
    if err != nil {
        return err
    }
    // 解析与搜索耦合在同一个回调中
    for _, hit := range searchResult.Hits.Hits {
        var item R
        if err := json.Unmarshal(hit.Source, &item); err != nil {
            return err
        }
        list = append(list, &item)
    }
    return nil
}, func() error {
    // Count 查询...
})

新版将搜索和解析分离——回调只负责执行搜索和更新 PIT ID,hits 的解析和裁剪在并行任务完成后统一处理:

// 新版:搜索与解析分离
if err = util.WaitAndGo(func() error {
    searchResult, err = searchService.Do(ctx)
    if err != nil {
        return err
    }
    // 只更新 PIT ID,不解析 hits
    if usePIT && *pitID != "" && searchResult.PitId != "" {
        *pitID = searchResult.PitId
    }
    return nil
}, func() error {
    // Count 查询...
})

// 并行任务完成后,统一处理 hits
hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize

effectiveHits := hits
if hasMore {
    effectiveHits = hits[:batchSize]  // 裁掉探测用的多余 1 条
}

list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
    var item R
    if err := json.Unmarshal(hit.Source, &item); err != nil {
        return nil, nil, 0, false, err
    }
    list = append(list, &item)
}

这种分离设计使得 hasMore 探测和结果裁剪逻辑可以在解析前完成,避免先解析再丢弃的浪费。

此外,Count 查询的触发条件也做了修正。旧版有一个额外的 e.afterHook == nil 条件:

// 旧版
if !isFirstBatch || !e.builder.needTotal || e.afterHook == nil {
    return nil
}

新版移除了这个条件:

// 新版
if !isFirstBatch || !e.builder.needTotal {
    return nil
}

这是因为 needTotal 本身就是控制是否查询总数的开关,不应该与 afterHook 耦合。即使没有设置 AfterQueryHook,如果用户明确要求了 needTotal,总数也应该被查询并返回。

4.7 Clone 的完善

Clone 方法也同步拷贝了新增的 pitID 字段:

func (e *ElasticSearchBuilder[R]) Clone() *ElasticSearchBuilder[R] {
    cloned := &ElasticSearchBuilder[R]{
        index:        e.index,
        filter:       e.filter,
        pitKeepAlive: e.pitKeepAlive, // 新增:拷贝 PIT 保活时长
        pitID:        e.pitID,        // 新增:拷贝 PIT ID
    }
    // ...
}

4.8 完整调用链路图

┌─────────────────────────────────────────────────────────────────┐
│                        业务层                                    │
│                                                                 │
│  场景A: 全量遍历(数据导出)          场景B: 跨请求分页(API 翻页) │
│  ┌──────────────────────┐          ┌──────────────────────────┐ │
│  │ b.SetNeedPagination  │          │ b.SetPITID(prevPitID)    │ │
│  │   (false)            │          │ b.SetCursorValue(...)    │ │
│  │ b.QueryCursor(ctx)   │          │ b.QueryPageWithPIT(ctx)  │ │
│  └──────────┬───────────┘          └──────────┬───────────────┘ │
└─────────────┼──────────────────────────────────┼────────────────┘
              │                                  │
              ▼                                  ▼
┌─────────────────────────┐    ┌──────────────────────────────────┐
│    QueryCursor          │    │      QueryPageWithPIT            │
│                         │    │                                  │
│  forcePIT = false       │    │  forcePIT = true                 │
│  usePIT = !needPaginate │    │  usePIT = true (强制)            │
│  hasMore: 忽略          │    │  hasMore: 多取1条探测             │
│  PIT关闭: defer         │    │  PIT关闭: 最后一页自动关闭         │
└──────────┬──────────────┘    └──────────┬───────────────────────┘
           │                              │
           └──────────┬───────────────────┘
                      ▼
         ┌────────────────────────┐
         │     doCursorQuery      │
         │                        │
         │  1. 按需打开 PIT        │
         │  2. 构建 search_after   │
         │  3. 并行: 查询 + Count  │
         │  4. 解析 hits           │
         │  5. 提取 sort values    │
         │  6. 按需关闭 PIT        │
         └────────────────────────┘

五、使用示例

5.1 全量遍历(数据导出场景)

b := builder.NewElasticSearchBuilder[Doc](
    builder.NewDBProxy(nil, nil, esClient), "my_index",
)
b.SetFilter(elastic.NewTermQuery("status", "active"))
b.SetCursorField("created_at")
b.SetSort(elastic.NewFieldSort("_id").Asc())
b.SetLimit(500)
b.SetNeedPagination(false)          // 关闭分页 → 自动启用 PIT
b.SetPitKeepAlive(2 * time.Minute)  // PIT 保活时长

for doc, err := range b.QueryCursor(ctx) {
    if err != nil {
        log.Printf("遍历错误: %v", err)
        break
    }
    export(doc)
}
// PIT 在迭代结束后自动关闭(defer)

5.2 跨请求分页(API 翻页场景)

// 首页请求
es := builder.NewElasticSearchBuilder[Doc](
    builder.NewDBProxy(nil, nil, esClient), "my_index",
)
es.SetFilter(elastic.NewMatchAllQuery()).
    SetCursorField("created_at", "id").
    SetLimit(20)

page, err := es.QueryPageWithPIT(ctx)
// 持久化 page.PitID + page.CursorValues → 编码为 page_token 返回给客户端

// 下一页请求:从 page_token 中解码出 pitID 和 cursorValues
es.SetPITID(prevPitID).SetCursorValue(prevCursorValues...)
page, err = es.QueryPageWithPIT(ctx)

// 最后一页:page.HasMore == false, page.PitID == ""
// PIT 已自动关闭,无需手动处理

后端 API 业务层数据组装参考

在实际的后端 API 中,QueryPageWithPIT 返回的 ESPITPageResult 需要组装为前后端约定的分页协议。推荐的接口契约如下:

请求参数

字段类型说明
page_sizeinteger每页条数
page_tokenstring不透明的分页令牌(首页为空)

响应参数

字段类型说明
itemsarray当前页数据
next_page_tokenstring下一页令牌(无更多数据时为空)
has_moreboolean是否还有下一页

page_token 生成策略

page_token 本质上是对 pit_id + cursor_values 的序列化封装。推荐的生成流程:

  1. 构建载荷{"pit_id":"...","cursor_values":[...],"exp":...,"v":1}
  2. 序列化并编码:JSON 序列化后进行 Base64URL 编码
  3. 完整性保护:根据安全需求添加 HMAC 签名或 AES-GCM 加密
  4. 请求时校验:每次请求解码 page_token 后,校验版本号、过期时间、签名,通过后再调用 SetPITID + SetCursorValue
// page_token 载荷结构示例
type PageToken struct {
    PitID        string `json:"pit_id"`
    CursorValues []any  `json:"cursor_values"`
    Exp          int64  `json:"exp"` // 过期时间戳
    Version      int    `json:"v"`   // 版本号,用于兼容性控制
}

// 编码:ESPITPageResult → page_token
func EncodePageToken(result *builder.ESPITPageResult[Doc]) string {
    if !result.HasMore {
        return "" // 最后一页,无需生成 token
    }
    token := PageToken{
        PitID:        result.PitID,
        CursorValues: result.CursorValues,
        Exp:          time.Now().Add(5 * time.Minute).Unix(),
        Version:      1,
    }
    data, _ := json.Marshal(token)
    // 实际生产中应添加 HMAC 签名或加密
    return base64.RawURLEncoding.EncodeToString(data)
}

// 解码:page_token → pitID + cursorValues
func DecodePageToken(tokenStr string) (*PageToken, error) {
    data, err := base64.RawURLEncoding.DecodeString(tokenStr)
    if err != nil {
        return nil, fmt.Errorf("invalid page token: %w", err)
    }
    var token PageToken
    if err := json.Unmarshal(data, &token); err != nil {
        return nil, fmt.Errorf("invalid page token payload: %w", err)
    }
    // 校验版本号和过期时间
    if token.Version != 1 {
        return nil, fmt.Errorf("unsupported page token version: %d", token.Version)
    }
    if time.Now().Unix() > token.Exp {
        return nil, fmt.Errorf("page token expired")
    }
    return &token, nil
}

集成使用建议

  • PIT 有 keep_alive 窗口,如果 PIT 过期或失效,应从首页重新开始并创建新的 PIT
  • 保持稳定的排序键(例如:业务时间戳 + 唯一 ID),使 search_after 的结果具有确定性
  • HasMore 通过 limit+1 探测计算得出,可作为分页提示,但仍应以返回的 cursor/token 作为真实依据

六、总结

方案一致性深度分页资源开销适用场景
from + size❌ 不一致❌ 有上限高(深度越大越高)浅层分页
search_after❌ 不一致✅ 无上限低(恒定)单次请求内的游标遍历
search_after + PIT✅ 一致✅ 无上限低(恒定 + PIT 快照)跨请求分页、数据导出

search_after + PIT 方案已在 QueryBuilder 中完整实现,提供了两种使用模式:

  • QueryCursor:全量遍历迭代器,SetNeedPagination(false) 时自动启用 PIT,适用于数据导出等批处理场景
  • QueryPageWithPIT:单批次 PIT 分页查询,返回 ESPITPageResult(含 PitIDCursorValuesHasMore),适用于跨请求的 API 翻页场景

核心设计决策:

  1. forcePIT 双层控制:通过 forcePIT 参数区分"自动 PIT"和"强制 PIT"两种场景,复用同一个 doCursorQuery 处理
  2. 多取一条探测QueryPageWithPIT 通过请求 batchSize + 1 条数据来判断 hasMore,避免额外的 Count 查询
  3. PIT 生命周期自动管理:错误时回滚、最后一页自动关闭、迭代结束时 defer 关闭——三重保障防止资源泄漏
  4. 中间件管道复用QueryPageWithPIT 通过 executeWithMiddlewares 执行,日志、指标、缓存等中间件无缝生效
一句话总结search_after 解决了深度分页的性能问题,PIT 解决了跨请求的一致性问题——两者结合,才是 ElasticSearch 分页的终极方案。