search_after 做深度分页的同学,有没有遇到过翻页数据重复或莫名丢失的情况?如果你的索引 refresh 频率比较高,这个坑大概率踩过。本文聊聊这个问题的根因,以及如何用 search_after + Point-in-Time (PIT) 彻底解决它。最后会结合 QueryBuilder 的实际代码,把整条链路串一遍。

一、ElasticSearch 的分页困境

1.1 from + size 的老问题

ES 最基础的分页就是 from + size

GET /my_index/_search
{
  "from": 10000,
  "size": 20,
  "query": { "match_all": {} },
  "sort": [{ "created_at": "desc" }]
}

问题大家都清楚——翻得越深越慢。from = 10000, size = 20 时,每个分片要先取出 10020 条文档发给协调节点,协调节点再做全局排序、丢掉前 10000 条。这个开销随 from 线性增长,所以 ES 默认把 from + size 上限卡在 10000(index.max_result_window),超了直接报错。

1.2 search_after:游标式深度分页

search_after 是官方推荐的替代方案,思路很简单:不用偏移量了,拿上一页最后一条的排序值当游标,直接跳到下一页的起点。

// 第一页
GET /my_index/_search
{
  "size": 20,
  "query": { "match_all": {} },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ]
}

// 第二页:把第一页最后一条的 sort values 塞进来
GET /my_index/_search
{
  "size": 20,
  "query": { "match_all": {} },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ],
  "search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}

好处很明显:

  • 不管翻到第几页,每次查询开销都是固定的(只取 size 条)
  • 不受 max_result_window 限制

但这里有个隐含前提——search_after 假设两次请求之间,索引的数据和排序是稳定的。一旦这个前提被打破,问题就来了。


二、高频 refresh 下的排序不稳定问题

2.1 问题根因

ES 是近实时(Near Real-Time)搜索引擎,写入的文档不会立即可搜索,要等 refresh 之后才对搜索可见。默认 refresh 间隔是 1 秒(index.refresh_interval)。

这就带来一个问题:用 search_after 做跨请求分页时,两次请求之间很可能发生 refresh,导致:

  1. 新文档被 refresh 进来——如果它的排序值恰好落在已翻过的页和当前页之间,后续页的文档整体"后移",你会看到重复数据
  2. 旧文档被删除或更新——排序值变了,某些文档直接被跳过
  3. 段合并(Segment Merge)——后台合并改变了文档物理存储顺序,影响 _doc 等隐式排序

画个时间线就很清楚了:

请求1(第1页)                    请求2(第2页)
    │                                │
    ▼                                ▼
┌─────────┐    refresh 发生     ┌─────────┐
│ 文档A    │    ──────────►     │ 文档X(新)│  ← 新文档插入
│ 文档B    │                    │ 文档A    │  ← 文档A 重复出现!
│ 文档C    │                    │ 文档B    │
│ ...     │                     │ ...     │
└─────────┘                     └─────────┘

2.2 影响场景

这个问题在以下场景里比较明显:

场景影响程度说明
数据导出🔴 严重全量遍历时数据重复或遗漏,导出结果不完整
后台管理列表翻页🟡 中等用户翻页看到重复数据,体验差
移动端"加载更多"🟡 中等下拉加载出现已看过的内容
实时数据流消费🔴 严重消费位点不准确,可能重复处理或丢失数据

2.3 为什么加 _id 排序也没用?

可能有人会想:加一个唯一的 _id 作为 tiebreaker 排序字段不就行了?

_id 确实能保证排序的唯一性,但它没法阻止新文档插入到已翻过的区间。search_after 的语义是"给我排序值大于这个游标的文档"——refresh 后如果出现了新的符合条件的文档,它们照样会出现在结果里。

所以根本问题不在排序是否唯一,而在于两次查询看到的数据视图不一致。


三、Point-in-Time (PIT):给索引拍个快照

3.1 PIT 是什么?

Point-in-Time (PIT) 是 ES 7.10 引入的特性。简单说,它能给索引创建一个轻量级快照——在这个快照的生命周期内,不管外面怎么 refresh、怎么写入新数据、怎么段合并,你的查询始终基于同一份数据视图。

                    PIT 创建
                       │
                       ▼
    ┌──────────────────────────────────────┐
    │         索引快照(冻结视图)           │
    │                                      │
    │  请求1 ──► 基于快照查询 ──► 结果1      │
    │  请求2 ──► 基于快照查询 ──► 结果2      │  ← 数据一致!
    │  请求3 ──► 基于快照查询 ──► 结果3      │
    │                                      │
    └──────────────────────────────────────┘
                       │
                    PIT 关闭

这就是上一章问题的解药——视图不变,排序自然稳定。

3.2 PIT 的工作流程

用起来也不复杂,四步走:

  1. 创建 PITPOST /my_index/_pit?keep_alive=1m,ES 返回一个 pit_id
  2. 带着 PIT 查询:搜索请求里携带 pit_id,这时候不需要再指定索引名了(PIT 已经绑定了索引)
  3. 续期:每次查询时带上 keep_alive 参数,ES 会返回可能更新过的 pit_id
  4. 关闭 PIT:用完了主动关闭,释放 ES 端的资源

3.3 search_after + PIT:组合起来

search_after 和 PIT 结合在一起,就能同时拿到深度分页的性能优势和数据一致性保障:

// 1. 创建 PIT
POST /my_index/_pit?keep_alive=1m
// 返回: { "id": "46ToAwMDaWR5BXV1..." }

// 2. 第一页查询(携带 PIT)
GET /_search
{
  "size": 20,
  "query": { "match_all": {} },
  "pit": {
    "id": "46ToAwMDaWR5BXV1...",
    "keep_alive": "1m"
  },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ]
}

// 3. 第二页查询(search_after + PIT)
GET /_search
{
  "size": 20,
  "query": { "match_all": {} },
  "pit": {
    "id": "46ToAwMDaWR5BXV1...",  // 使用上一次返回的 pit_id
    "keep_alive": "1m"
  },
  "sort": [
    { "created_at": "desc" },
    { "_id": "asc" }
  ],
  "search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}

// 4. 遍历完成后关闭 PIT
DELETE /_pit
{ "id": "46ToAwMDaWR5BXV1..." }

四、QueryBuilder 中的实现:代码链路梳理

接下来结合 QueryBuilder 的代码,看看 search_after + PIT 方案落地时需要处理哪些细节。从接口设计到底层执行,逐层过一遍。

4.1 新增的数据结构与字段

先看结构体层面的变化。ElasticSearchBuilder 加了两个字段——pitKeepAlive 控制 PIT 保活时长,pitID 用于跨请求传递 PIT 会话:

type ElasticSearchBuilder[R any] struct {
    builder[*ElasticSearchBuilder[R], R]
    index        string
    filter       elastic.Query
    sort         []elastic.Sorter
    pitKeepAlive time.Duration // 新增:Point-in-Time 保持时间
    pitID        string        // 新增:外部传入/内部更新的 PIT ID(用于跨请求分页)
}

同时新增了 ESPITPageResult 结构体,作为 PIT 分页查询的返回结果:

// ESPITPageResult ES PIT 分页查询结果。
// PitID 与 CursorValues 可用于下一批查询。
type ESPITPageResult[R any] struct {
    List         []*R    // 当前页数据
    Total        int64   // 总数(仅首页查询时返回)
    HasMore      bool    // 是否还有下一页
    PitID        string  // 下一页使用的 PIT ID
    CursorValues []any   // 下一页使用的游标值
}

还有一个防御性的错误定义:

// ErrPITCursorWithoutPITID ElasticSearch 单批次分页查询模式下未提供 PIT ID 的错误
ErrPITCursorWithoutPITID = errors.New("PIT ID is required when cursor values are provided")

逻辑很简单:如果调用方传了 cursorValues(说明不是第一页),但没传 pitID,那 PIT 会话肯定丢了。这时候与其静默返回错误数据,不如直接报错让调用方知道该重新开始了。

4.2 新增 API:SetPitKeepAliveSetPITIDQueryPageWithPIT

SetPitKeepAlive 用来设置 PIT 的保活时长:

func (e *ElasticSearchBuilder[R]) SetPitKeepAlive(keepAlive time.Duration) *ElasticSearchBuilder[R] {
    e.pitKeepAlive = keepAlive
    return e
}

不设置的话默认 1 分钟(pitKeepAliveString() 内部兜底)。如果翻页间隔比较大,建议调大一些(比如 2 * time.Minute),免得 PIT 在两次请求之间过期。

SetPITID 允许外部传入 PIT ID,用于跨请求续查:

func (e *ElasticSearchBuilder[R]) SetPITID(pitID string) *ElasticSearchBuilder[R] {
    e.pitID = pitID
    return e
}

重头戏是 QueryPageWithPIT,执行基于 PIT + search_after 的单批次分页查询:

func (e *ElasticSearchBuilder[R]) QueryPageWithPIT(ctx context.Context) (*ESPITPageResult[R], error) {
    if err := e.builder.prepareAndValidate(); err != nil {
        return nil, err
    }
    if e.index == "" {
        return nil, errors.New("elasticsearch index not configured")
    }
    // 防御性校验:有游标值但没有 PIT ID,说明会话已丢失
    if len(e.builder.cursorValues) > 0 && e.pitID == "" {
        return nil, ErrPITCursorWithoutPITID
    }

    // 临时覆盖分页标志,确保 QueryMeta 和执行语义一致
    oldNeedPagination := e.builder.needPagination
    oldIsCursorQuery := e.builder.isCursorQuery
    oldPitID := e.pitID
    defer func() {
        e.builder.needPagination = oldNeedPagination
        e.builder.isCursorQuery = oldIsCursorQuery
        e.pitID = oldPitID
    }()

    e.builder.needPagination = true
    e.builder.isCursorQuery = true

    isFirstBatch := len(e.builder.cursorValues) == 0

    var (
        nextCursorValues []any
        hasMore          bool
        resultPitID      string
    )

    // 通过中间件管道执行查询
    list, total, err := e.builder.executeWithMiddlewares(ctx, func(ctx context.Context) ([]*R, int64, error) {
        batchList, batchNextCursorValues, batchTotal, batchHasMore, queryErr := e.doCursorQuery(
            ctx, e.builder.cursorValues, isFirstBatch, true, &e.pitID,
        )
        if queryErr != nil {
            return nil, 0, queryErr
        }
        nextCursorValues = batchNextCursorValues
        hasMore = batchHasMore
        resultPitID = e.pitID
        return batchList, batchTotal, nil
    })
    if err != nil {
        return nil, err
    }

    result := &ESPITPageResult[R]{
        List:         list,
        Total:        total,
        HasMore:      hasMore,
        PitID:        resultPitID,
        CursorValues: nextCursorValues,
    }

    // 最后一页:清空 PIT 信息,提示调用方无需继续
    if !hasMore {
        result.PitID = ""
        result.CursorValues = nil
    }

    return result, nil
}

这里有几个值得注意的点:

  • 方法开头临时把 needPaginationisCursorQuery 设为 true,让中间件能正确感知这是一次游标分页查询,查完后通过 defer 恢复原值,不影响构建器后续复用。
  • 查询走的是 executeWithMiddlewares,所以缓存、日志、指标这些中间件对 PIT 查询同样生效,不需要额外适配。
  • 最后一页时自动清空 PitIDCursorValues,调用方拿到空值就知道翻完了。

4.3 底层引擎重构:doCursorQuery 的演进

doCursorQuery 是游标分页的核心执行函数。引入 PIT 之前,它就是个纯粹的 search_after 查询,不管快照的事。这次改动主要涉及三块:签名变更、PIT 管理、hasMore 判断。

签名变更

// 旧版(无 PIT 支持)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
    ctx context.Context, cursorValues []any, isFirstBatch bool,
) ([]*R, []any, int64, error)

// 新版(PIT + hasMore)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
    ctx context.Context, cursorValues []any,
    isFirstBatch, forcePIT bool,
    pitID *string,
) (list []*R, nextCursorValues []any, total int64, hasMore bool, err error)

变化不大但很关键:

  • 加了 pitID *string 参数,用指针是因为需要在多批次查询间传递和更新 PIT ID
  • 加了 forcePIT 参数,区分"全量遍历自动 PIT"和"跨请求强制 PIT"两种场景
  • 加了 hasMore 返回值,给 QueryPageWithPIT 用来判断是否还有下一页

从直连索引到 PIT 快照

旧版直接拿索引名查,没有快照保障:

// 旧版:直接使用 Index 查询,无 PIT 支持
searchService := e.builder.data.ElasticSearch.Search().
    Index(e.index).
    Query(filter).
    Size(batchSize)

新版通过 forcePITusePIT 做了双层控制:

querySize := batchSize
if forcePIT {
    // PIT 分页场景通过多取 1 条记录来判断是否还有下一页
    querySize = batchSize + 1
}

usePIT := forcePIT || !e.builder.needPagination
  • forcePIT = true(来自 QueryPageWithPIT):强制用 PIT,并多取 1 条来探测 hasMore
  • !needPagination(来自 QueryCursor 全量遍历):自动开 PIT 保证遍历一致性
  • 两者都为 false 时退化为旧版行为,直接 Index(e.index) 查询

PIT 生命周期管理

usePIT := forcePIT || !e.builder.needPagination
openedPITByThisCall := false

defer func() {
    // 如果本次调用打开了 PIT 但查询失败,主动关闭以避免资源泄漏
    if err != nil && openedPITByThisCall && pitID != nil && *pitID != "" {
        e.closePIT(*pitID)
        *pitID = ""
    }
}()

if usePIT {
    if pitID == nil {
        return nil, nil, 0, false, errors.New("pitID pointer is nil")
    }
    if *pitID == "" {
        // 首次调用:自动打开 PIT
        openResp, err := e.builder.data.ElasticSearch.OpenPointInTime(e.index).
            KeepAlive(e.pitKeepAliveString()).Do(ctx)
        if err != nil {
            return nil, nil, 0, false, err
        }
        *pitID = openResp.Id
        openedPITByThisCall = true
    }
    // 使用 PIT 查询时不指定索引名(PIT 已绑定索引)
    searchService = searchService.PointInTime(
        elastic.NewPointInTimeWithKeepAlive(*pitID, e.pitKeepAliveString()),
    )
} else {
    searchService = searchService.Index(e.index)
}

关键设计

  • openedPITByThisCall 这个标记比较巧妙——如果是本次调用打开的 PIT,但后续查询挂了,就需要主动关掉避免 ES 资源泄漏
  • PIT 查询时不指定索引名,因为 PIT 本身已经绑定了索引

hasMore 探测与结果裁剪

hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize

effectiveHits := hits
if hasMore {
    effectiveHits = hits[:batchSize]  // 裁掉多取的那 1 条
}

list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
    var item R
    if err := json.Unmarshal(hit.Source, &item); err != nil {
        return nil, nil, 0, false, err
    }
    list = append(list, &item)
}

这是个经典的"多取一条"探测技巧:请求 batchSize + 1 条,如果真的返回了这么多,说明后面还有数据;否则就是最后一页了。返回给调用方时把多取的那条裁掉就行。

最后一页自动关闭 PIT

if forcePIT && !hasMore && *pitID != "" {
    e.closePIT(*pitID)
    *pitID = ""
}

forcePIT 模式下,检测到最后一页就自动关闭 PIT。这样即使调用方忘了手动关闭,也不会一直占着 ES 资源。

4.4 新增 closePIT:PIT 资源释放

这个方法比较简单,就是在 PIT 不再需要时主动释放 ES 资源:

const esPITCloseTimeout = 3 * time.Second

func (e *ElasticSearchBuilder[R]) closePIT(pitID string) {
    if pitID == "" {
        return
    }
    closeCtx, cancel := context.WithTimeout(context.Background(), esPITCloseTimeout)
    defer cancel()
    _, _ = e.builder.data.ElasticSearch.ClosePointInTime(pitID).Do(closeCtx)
}

几个细节:超时时间用常量提出来方便调整;用 context.Background() 而不是调用方的 ctx,避免调用方 cancel 了导致 PIT 关不掉;关闭错误直接忽略,因为 PIT 关闭失败不应该影响业务,ES 会在 keep_alive 过期后自动回收。

closePIT 的调用时机由上层控制:QueryCursor 通过 defer 在迭代结束后关闭,doCursorQuery 在最后一页自动关闭,出错时通过 openedPITByThisCall 标记回滚。

4.5 QueryCursor 的重构

旧版 QueryCursor 很简洁——doCursorQuery 的签名刚好匹配回调类型,直接传进去就行:

// 旧版:签名匹配,直接传入
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
    // ...
    return e.builder.executeCursorWithMiddlewares(ctx, e.doCursorQuery)
}

但现在 doCursorQuery 多了 forcePITpitIDhasMore 这些参数和返回值,签名对不上了,得包一层闭包来适配。同时还要管理 PIT 的生命周期:

// 新版:包装适配 + PIT 生命周期管理
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
    // ...
    var pitID string
    wrappedCursorQuery := func(ctx context.Context, cursorValues []any, isFirstBatch bool) ([]*R, []any, int64, error) {
        // forcePIT = false:由 needPagination 控制是否启用 PIT
        list, nextCursorValues, total, _, err := e.doCursorQuery(ctx, cursorValues, isFirstBatch, false, &pitID)
        return list, nextCursorValues, total, err
    }
    innerIter := e.builder.executeCursorWithMiddlewares(ctx, wrappedCursorQuery)
    return func(yield func(*R, error) bool) {
        defer func() {
            e.closePIT(pitID)  // 迭代结束后关闭 PIT
        }()
        for item, err := range innerIter {
            if !yield(item, err) {
                return
            }
        }
    }
}

几个值得注意的点:

  • wrappedCursorQuery 闭包里传 forcePIT = false,让 usePIT 的决定权交给 needPagination
  • hasMore 返回值直接 _ 丢掉——全量遍历模式下不需要这个,"本批次有没有数据"就够判断了
  • 外层套了个 defer e.closePIT(pitID),不管是正常遍历完还是中途 break,PIT 都能被正确释放

4.6 结果解析的重构

旧版把搜索和 JSON 解析揉在同一个并行回调里:

// 旧版:在搜索回调内部解析
if err := util.WaitAndGo(func() error {
    searchResult, err = searchService.Do(ctx)
    if err != nil {
        return err
    }
    // 解析与搜索耦合在同一个回调中
    for _, hit := range searchResult.Hits.Hits {
        var item R
        if err := json.Unmarshal(hit.Source, &item); err != nil {
            return err
        }
        list = append(list, &item)
    }
    return nil
}, func() error {
    // Count 查询...
})

新版把这两步拆开了——回调里只管搜索和更新 PIT ID,hits 的解析放到并行任务跑完之后统一处理:

// 新版:搜索与解析分离
if err = util.WaitAndGo(func() error {
    searchResult, err = searchService.Do(ctx)
    if err != nil {
        return err
    }
    // 只更新 PIT ID,不解析 hits
    if usePIT && *pitID != "" && searchResult.PitId != "" {
        *pitID = searchResult.PitId
    }
    return nil
}, func() error {
    // Count 查询...
})

// 并行任务完成后,统一处理 hits
hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize

effectiveHits := hits
if hasMore {
    effectiveHits = hits[:batchSize]  // 裁掉探测用的多余 1 条
}

list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
    var item R
    if err := json.Unmarshal(hit.Source, &item); err != nil {
        return nil, nil, 0, false, err
    }
    list = append(list, &item)
}

为什么要拆?因为现在有 hasMore 探测逻辑——需要先看 hits 数量决定裁不裁剪,再去解析。如果还是老写法,就得先解析完再丢掉多余的那条,白白浪费一次反序列化。

另外还顺手修了个 Count 查询的触发条件。旧版多了个 e.afterHook == nil 的判断:

// 旧版
if !isFirstBatch || !e.builder.needTotal || e.afterHook == nil {
    return nil
}

新版移除了这个条件:

// 新版
if !isFirstBatch || !e.builder.needTotal {
    return nil
}

这个条件其实不合理——needTotal 本身就是"要不要查总数"的开关,跟有没有设 AfterQueryHook 没关系。用户说要总数,那就该查,不管有没有 hook。

4.7 Clone 的完善

既然加了新字段,Clone 方法自然也得跟着拷贝:

func (e *ElasticSearchBuilder[R]) Clone() *ElasticSearchBuilder[R] {
    cloned := &ElasticSearchBuilder[R]{
        index:        e.index,
        filter:       e.filter,
        pitKeepAlive: e.pitKeepAlive, // 新增:拷贝 PIT 保活时长
        pitID:        e.pitID,        // 新增:拷贝 PIT ID
    }
    // ...
}

4.8 完整调用链路图

最后用一张图把整条链路串起来,方便对照代码理解:

┌─────────────────────────────────────────────────────────────────┐
│                        业务层                                    │
│                                                                 │
│  场景A: 全量遍历(数据导出)          场景B: 跨请求分页(API 翻页) │
│  ┌──────────────────────┐          ┌──────────────────────────┐ │
│  │ b.SetNeedPagination  │          │ b.SetPITID(prevPitID)    │ │
│  │   (false)            │          │ b.SetCursorValue(...)    │ │
│  │ b.QueryCursor(ctx)   │          │ b.QueryPageWithPIT(ctx)  │ │
│  └──────────┬───────────┘          └──────────┬───────────────┘ │
└─────────────┼──────────────────────────────────┼────────────────┘
              │                                  │
              ▼                                  ▼
┌─────────────────────────┐    ┌──────────────────────────────────┐
│    QueryCursor          │    │      QueryPageWithPIT            │
│                         │    │                                  │
│  forcePIT = false       │    │  forcePIT = true                 │
│  usePIT = !needPaginate │    │  usePIT = true (强制)            │
│  hasMore: 忽略          │    │  hasMore: 多取1条探测             │
│  PIT关闭: defer         │    │  PIT关闭: 最后一页自动关闭         │
└──────────┬──────────────┘    └──────────┬───────────────────────┘
           │                              │
           └──────────┬───────────────────┘
                      ▼
         ┌────────────────────────┐
         │     doCursorQuery      │
         │                        │
         │  1. 按需打开 PIT        │
         │  2. 构建 search_after   │
         │  3. 并行: 查询 + Count  │
         │  4. 解析 hits           │
         │  5. 提取 sort values    │
         │  6. 按需关闭 PIT        │
         └────────────────────────┘

五、实际怎么用

5.1 全量遍历(数据导出场景)

典型的数据导出需求,把索引里符合条件的文档全部遍历一遍:

b := builder.NewElasticSearchBuilder[Doc](
    builder.NewDBProxy(nil, nil, esClient), "my_index",
)
b.SetFilter(elastic.NewTermQuery("status", "active"))
b.SetCursorField("created_at")
b.SetSort(elastic.NewFieldSort("_id").Asc())
b.SetLimit(500)
b.SetNeedPagination(false)          // 关闭分页 → 自动启用 PIT
b.SetPitKeepAlive(2 * time.Minute)  // PIT 保活时长

for doc, err := range b.QueryCursor(ctx) {
    if err != nil {
        log.Printf("遍历错误: %v", err)
        break
    }
    export(doc)
}
// PIT 在迭代结束后自动关闭(defer)

不需要手动管理 PIT 的生命周期,QueryCursor 内部会在迭代结束(包括 break 提前退出)时自动关闭。

5.2 跨请求分页(API 翻页场景)

这是更常见的场景——前端点「下一页」,后端需要接着上次的位置继续查:

// 首页请求
es := builder.NewElasticSearchBuilder[Doc](
    builder.NewDBProxy(nil, nil, esClient), "my_index",
)
es.SetFilter(elastic.NewMatchAllQuery()).
    SetCursorField("created_at", "id").
    SetLimit(20)

page, err := es.QueryPageWithPIT(ctx)
// 持久化 page.PitID + page.CursorValues → 编码为 page_token 返回给客户端

// 下一页请求:从 page_token 中解码出 pitID 和 cursorValues
es.SetPITID(prevPitID).SetCursorValue(prevCursorValues...)
page, err = es.QueryPageWithPIT(ctx)

// 最后一页:page.HasMore == false, page.PitID == ""
// PIT 已自动关闭,无需手动处理

后端 API 业务层数据组装参考

实际项目中,QueryPageWithPIT 返回的 ESPITPageResult 还需要包装成前后端约定的分页协议才能用。下面是一个比较通用的接口契约设计:

请求参数

字段类型说明
page_sizeinteger每页条数
page_tokenstring不透明的分页令牌(首页为空)

响应参数

字段类型说明
itemsarray当前页数据
next_page_tokenstring下一页令牌(无更多数据时为空)
has_moreboolean是否还有下一页

page_token 生成策略

page_token 说白了就是把 pit_id + cursor_values 序列化打包成一个不透明字符串。具体流程:

  1. 构建载荷:{"pit_id":"...","cursor_values":[...],"exp":...,"v":1}
  2. JSON 序列化后做 Base64URL 编码
  3. 如果有安全需求,可以加 HMAC 签名或 AES-GCM 加密
  4. 每次请求解码 page_token 后,校验版本号、过期时间、签名,通过后再调用 SetPITID + SetCursorValue
// page_token 载荷结构示例
type PageToken struct {
    PitID        string `json:"pit_id"`
    CursorValues []any  `json:"cursor_values"`
    Exp          int64  `json:"exp"` // 过期时间戳
    Version      int    `json:"v"`   // 版本号,用于兼容性控制
}

// 编码:ESPITPageResult → page_token
func EncodePageToken(result *builder.ESPITPageResult[Doc]) string {
    if !result.HasMore {
        return "" // 最后一页,无需生成 token
    }
    token := PageToken{
        PitID:        result.PitID,
        CursorValues: result.CursorValues,
        Exp:          time.Now().Add(5 * time.Minute).Unix(),
        Version:      1,
    }
    data, _ := json.Marshal(token)
    // 实际生产中应添加 HMAC 签名或加密
    return base64.RawURLEncoding.EncodeToString(data)
}

// 解码:page_token → pitID + cursorValues
func DecodePageToken(tokenStr string) (*PageToken, error) {
    data, err := base64.RawURLEncoding.DecodeString(tokenStr)
    if err != nil {
        return nil, fmt.Errorf("invalid page token: %w", err)
    }
    var token PageToken
    if err := json.Unmarshal(data, &token); err != nil {
        return nil, fmt.Errorf("invalid page token payload: %w", err)
    }
    // 校验版本号和过期时间
    if token.Version != 1 {
        return nil, fmt.Errorf("unsupported page token version: %d", token.Version)
    }
    if time.Now().Unix() > token.Exp {
        return nil, fmt.Errorf("page token expired")
    }
    return &token, nil
}

几个实践中容易踩坑的点:

  • PIT 有 keep_alive 窗口,过期了就废了。如果用户隔了很久才翻下一页,token 解码后发现 PIT 已失效,老老实实从首页重新来一遍
  • 排序键要稳定(业务时间戳 + 唯一 ID 是比较靠谱的组合),不然 search_after 的结果本身就不确定
  • HasMore 是通过 limit+1 探测出来的,作为前端展示「查看更多」的依据足够了

六、总结

把三种方案放在一起对比一下:

方案一致性深度分页资源开销适用场景
from + size❌ 不一致❌ 有上限高(越深越慢)前几页的简单分页
search_after❌ 不一致✅ 无上限低(恒定)单次请求内的游标遍历
search_after + PIT✅ 一致✅ 无上限低(恒定 + PIT 快照)跨请求分页、数据导出

search_after + PIT 方案已经在 QueryBuilder 中落地,对外暴露两个入口:

  • QueryCursor:全量遍历迭代器,SetNeedPagination(false) 时自动启用 PIT,适合数据导出等批处理场景
  • QueryPageWithPIT:单批次 PIT 分页查询,返回 ESPITPageResult(含 PitIDCursorValuesHasMore),适合跨请求的 API 翻页

实现上几个值得一提的设计选择:

  1. 通过 forcePIT 参数区分「自动 PIT」和「强制 PIT」两种场景,底层复用同一个 doCursorQuery 引擎
  2. QueryPageWithPITbatchSize + 1 的方式探测 hasMore,省掉了一次额外的 Count 查询
  3. PIT 的生命周期做了三重保障:出错时回滚、最后一页自动关闭、迭代结束时 defer 兜底
  4. QueryPageWithPITexecuteWithMiddlewares 管道,日志、指标、缓存等中间件照常生效

简单说:search_after 解决了深度分页的性能问题,PIT 解决了跨请求的一致性问题。两者结合起来,基本上就是 ES 分页的最优解了。