用search_after做深度分页的同学,有没有遇到过翻页数据重复或莫名丢失的情况?如果你的索引 refresh 频率比较高,这个坑大概率踩过。本文聊聊这个问题的根因,以及如何用search_after+Point-in-Time (PIT)彻底解决它。最后会结合 QueryBuilder 的实际代码,把整条链路串一遍。
一、ElasticSearch 的分页困境
1.1 from + size 的老问题
ES 最基础的分页就是 from + size:
GET /my_index/_search
{
"from": 10000,
"size": 20,
"query": { "match_all": {} },
"sort": [{ "created_at": "desc" }]
}问题大家都清楚——翻得越深越慢。from = 10000, size = 20 时,每个分片要先取出 10020 条文档发给协调节点,协调节点再做全局排序、丢掉前 10000 条。这个开销随 from 线性增长,所以 ES 默认把 from + size 上限卡在 10000(index.max_result_window),超了直接报错。
1.2 search_after:游标式深度分页
search_after 是官方推荐的替代方案,思路很简单:不用偏移量了,拿上一页最后一条的排序值当游标,直接跳到下一页的起点。
// 第一页
GET /my_index/_search
{
"size": 20,
"query": { "match_all": {} },
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
]
}
// 第二页:把第一页最后一条的 sort values 塞进来
GET /my_index/_search
{
"size": 20,
"query": { "match_all": {} },
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
],
"search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}好处很明显:
- 不管翻到第几页,每次查询开销都是固定的(只取
size条) - 不受
max_result_window限制
但这里有个隐含前提——search_after 假设两次请求之间,索引的数据和排序是稳定的。一旦这个前提被打破,问题就来了。
二、高频 refresh 下的排序不稳定问题
2.1 问题根因
ES 是近实时(Near Real-Time)搜索引擎,写入的文档不会立即可搜索,要等 refresh 之后才对搜索可见。默认 refresh 间隔是 1 秒(index.refresh_interval)。
这就带来一个问题:用 search_after 做跨请求分页时,两次请求之间很可能发生 refresh,导致:
- 新文档被 refresh 进来——如果它的排序值恰好落在已翻过的页和当前页之间,后续页的文档整体"后移",你会看到重复数据
- 旧文档被删除或更新——排序值变了,某些文档直接被跳过
- 段合并(Segment Merge)——后台合并改变了文档物理存储顺序,影响
_doc等隐式排序
画个时间线就很清楚了:
请求1(第1页) 请求2(第2页)
│ │
▼ ▼
┌─────────┐ refresh 发生 ┌─────────┐
│ 文档A │ ──────────► │ 文档X(新)│ ← 新文档插入
│ 文档B │ │ 文档A │ ← 文档A 重复出现!
│ 文档C │ │ 文档B │
│ ... │ │ ... │
└─────────┘ └─────────┘2.2 影响场景
这个问题在以下场景里比较明显:
| 场景 | 影响程度 | 说明 |
|---|---|---|
| 数据导出 | 🔴 严重 | 全量遍历时数据重复或遗漏,导出结果不完整 |
| 后台管理列表翻页 | 🟡 中等 | 用户翻页看到重复数据,体验差 |
| 移动端"加载更多" | 🟡 中等 | 下拉加载出现已看过的内容 |
| 实时数据流消费 | 🔴 严重 | 消费位点不准确,可能重复处理或丢失数据 |
2.3 为什么加 _id 排序也没用?
可能有人会想:加一个唯一的 _id 作为 tiebreaker 排序字段不就行了?
_id 确实能保证排序的唯一性,但它没法阻止新文档插入到已翻过的区间。search_after 的语义是"给我排序值大于这个游标的文档"——refresh 后如果出现了新的符合条件的文档,它们照样会出现在结果里。
所以根本问题不在排序是否唯一,而在于两次查询看到的数据视图不一致。
三、Point-in-Time (PIT):给索引拍个快照
3.1 PIT 是什么?
Point-in-Time (PIT) 是 ES 7.10 引入的特性。简单说,它能给索引创建一个轻量级快照——在这个快照的生命周期内,不管外面怎么 refresh、怎么写入新数据、怎么段合并,你的查询始终基于同一份数据视图。
PIT 创建
│
▼
┌──────────────────────────────────────┐
│ 索引快照(冻结视图) │
│ │
│ 请求1 ──► 基于快照查询 ──► 结果1 │
│ 请求2 ──► 基于快照查询 ──► 结果2 │ ← 数据一致!
│ 请求3 ──► 基于快照查询 ──► 结果3 │
│ │
└──────────────────────────────────────┘
│
PIT 关闭这就是上一章问题的解药——视图不变,排序自然稳定。
3.2 PIT 的工作流程
用起来也不复杂,四步走:
- 创建 PIT:
POST /my_index/_pit?keep_alive=1m,ES 返回一个pit_id - 带着 PIT 查询:搜索请求里携带
pit_id,这时候不需要再指定索引名了(PIT 已经绑定了索引) - 续期:每次查询时带上
keep_alive参数,ES 会返回可能更新过的pit_id - 关闭 PIT:用完了主动关闭,释放 ES 端的资源
3.3 search_after + PIT:组合起来
把 search_after 和 PIT 结合在一起,就能同时拿到深度分页的性能优势和数据一致性保障:
// 1. 创建 PIT
POST /my_index/_pit?keep_alive=1m
// 返回: { "id": "46ToAwMDaWR5BXV1..." }
// 2. 第一页查询(携带 PIT)
GET /_search
{
"size": 20,
"query": { "match_all": {} },
"pit": {
"id": "46ToAwMDaWR5BXV1...",
"keep_alive": "1m"
},
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
]
}
// 3. 第二页查询(search_after + PIT)
GET /_search
{
"size": 20,
"query": { "match_all": {} },
"pit": {
"id": "46ToAwMDaWR5BXV1...", // 使用上一次返回的 pit_id
"keep_alive": "1m"
},
"sort": [
{ "created_at": "desc" },
{ "_id": "asc" }
],
"search_after": ["2026-05-17T10:30:00Z", "doc_999"]
}
// 4. 遍历完成后关闭 PIT
DELETE /_pit
{ "id": "46ToAwMDaWR5BXV1..." }四、QueryBuilder 中的实现:代码链路梳理
接下来结合 QueryBuilder 的代码,看看 search_after + PIT 方案落地时需要处理哪些细节。从接口设计到底层执行,逐层过一遍。
4.1 新增的数据结构与字段
先看结构体层面的变化。ElasticSearchBuilder 加了两个字段——pitKeepAlive 控制 PIT 保活时长,pitID 用于跨请求传递 PIT 会话:
type ElasticSearchBuilder[R any] struct {
builder[*ElasticSearchBuilder[R], R]
index string
filter elastic.Query
sort []elastic.Sorter
pitKeepAlive time.Duration // 新增:Point-in-Time 保持时间
pitID string // 新增:外部传入/内部更新的 PIT ID(用于跨请求分页)
}同时新增了 ESPITPageResult 结构体,作为 PIT 分页查询的返回结果:
// ESPITPageResult ES PIT 分页查询结果。
// PitID 与 CursorValues 可用于下一批查询。
type ESPITPageResult[R any] struct {
List []*R // 当前页数据
Total int64 // 总数(仅首页查询时返回)
HasMore bool // 是否还有下一页
PitID string // 下一页使用的 PIT ID
CursorValues []any // 下一页使用的游标值
}还有一个防御性的错误定义:
// ErrPITCursorWithoutPITID ElasticSearch 单批次分页查询模式下未提供 PIT ID 的错误
ErrPITCursorWithoutPITID = errors.New("PIT ID is required when cursor values are provided")逻辑很简单:如果调用方传了 cursorValues(说明不是第一页),但没传 pitID,那 PIT 会话肯定丢了。这时候与其静默返回错误数据,不如直接报错让调用方知道该重新开始了。
4.2 新增 API:SetPitKeepAlive、SetPITID 与 QueryPageWithPIT
SetPitKeepAlive 用来设置 PIT 的保活时长:
func (e *ElasticSearchBuilder[R]) SetPitKeepAlive(keepAlive time.Duration) *ElasticSearchBuilder[R] {
e.pitKeepAlive = keepAlive
return e
}不设置的话默认 1 分钟(pitKeepAliveString() 内部兜底)。如果翻页间隔比较大,建议调大一些(比如 2 * time.Minute),免得 PIT 在两次请求之间过期。
SetPITID 允许外部传入 PIT ID,用于跨请求续查:
func (e *ElasticSearchBuilder[R]) SetPITID(pitID string) *ElasticSearchBuilder[R] {
e.pitID = pitID
return e
}重头戏是 QueryPageWithPIT,执行基于 PIT + search_after 的单批次分页查询:
func (e *ElasticSearchBuilder[R]) QueryPageWithPIT(ctx context.Context) (*ESPITPageResult[R], error) {
if err := e.builder.prepareAndValidate(); err != nil {
return nil, err
}
if e.index == "" {
return nil, errors.New("elasticsearch index not configured")
}
// 防御性校验:有游标值但没有 PIT ID,说明会话已丢失
if len(e.builder.cursorValues) > 0 && e.pitID == "" {
return nil, ErrPITCursorWithoutPITID
}
// 临时覆盖分页标志,确保 QueryMeta 和执行语义一致
oldNeedPagination := e.builder.needPagination
oldIsCursorQuery := e.builder.isCursorQuery
oldPitID := e.pitID
defer func() {
e.builder.needPagination = oldNeedPagination
e.builder.isCursorQuery = oldIsCursorQuery
e.pitID = oldPitID
}()
e.builder.needPagination = true
e.builder.isCursorQuery = true
isFirstBatch := len(e.builder.cursorValues) == 0
var (
nextCursorValues []any
hasMore bool
resultPitID string
)
// 通过中间件管道执行查询
list, total, err := e.builder.executeWithMiddlewares(ctx, func(ctx context.Context) ([]*R, int64, error) {
batchList, batchNextCursorValues, batchTotal, batchHasMore, queryErr := e.doCursorQuery(
ctx, e.builder.cursorValues, isFirstBatch, true, &e.pitID,
)
if queryErr != nil {
return nil, 0, queryErr
}
nextCursorValues = batchNextCursorValues
hasMore = batchHasMore
resultPitID = e.pitID
return batchList, batchTotal, nil
})
if err != nil {
return nil, err
}
result := &ESPITPageResult[R]{
List: list,
Total: total,
HasMore: hasMore,
PitID: resultPitID,
CursorValues: nextCursorValues,
}
// 最后一页:清空 PIT 信息,提示调用方无需继续
if !hasMore {
result.PitID = ""
result.CursorValues = nil
}
return result, nil
}这里有几个值得注意的点:
- 方法开头临时把
needPagination和isCursorQuery设为true,让中间件能正确感知这是一次游标分页查询,查完后通过defer恢复原值,不影响构建器后续复用。 - 查询走的是
executeWithMiddlewares,所以缓存、日志、指标这些中间件对 PIT 查询同样生效,不需要额外适配。 - 最后一页时自动清空
PitID和CursorValues,调用方拿到空值就知道翻完了。
4.3 底层引擎重构:doCursorQuery 的演进
doCursorQuery 是游标分页的核心执行函数。引入 PIT 之前,它就是个纯粹的 search_after 查询,不管快照的事。这次改动主要涉及三块:签名变更、PIT 管理、hasMore 判断。
签名变更
// 旧版(无 PIT 支持)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
ctx context.Context, cursorValues []any, isFirstBatch bool,
) ([]*R, []any, int64, error)
// 新版(PIT + hasMore)
func (e *ElasticSearchBuilder[R]) doCursorQuery(
ctx context.Context, cursorValues []any,
isFirstBatch, forcePIT bool,
pitID *string,
) (list []*R, nextCursorValues []any, total int64, hasMore bool, err error)变化不大但很关键:
- 加了
pitID *string参数,用指针是因为需要在多批次查询间传递和更新 PIT ID - 加了
forcePIT参数,区分"全量遍历自动 PIT"和"跨请求强制 PIT"两种场景 - 加了
hasMore返回值,给QueryPageWithPIT用来判断是否还有下一页
从直连索引到 PIT 快照
旧版直接拿索引名查,没有快照保障:
// 旧版:直接使用 Index 查询,无 PIT 支持
searchService := e.builder.data.ElasticSearch.Search().
Index(e.index).
Query(filter).
Size(batchSize)新版通过 forcePIT 和 usePIT 做了双层控制:
querySize := batchSize
if forcePIT {
// PIT 分页场景通过多取 1 条记录来判断是否还有下一页
querySize = batchSize + 1
}
usePIT := forcePIT || !e.builder.needPaginationforcePIT = true(来自QueryPageWithPIT):强制用 PIT,并多取 1 条来探测hasMore!needPagination(来自QueryCursor全量遍历):自动开 PIT 保证遍历一致性- 两者都为
false时退化为旧版行为,直接Index(e.index)查询
PIT 生命周期管理
usePIT := forcePIT || !e.builder.needPagination
openedPITByThisCall := false
defer func() {
// 如果本次调用打开了 PIT 但查询失败,主动关闭以避免资源泄漏
if err != nil && openedPITByThisCall && pitID != nil && *pitID != "" {
e.closePIT(*pitID)
*pitID = ""
}
}()
if usePIT {
if pitID == nil {
return nil, nil, 0, false, errors.New("pitID pointer is nil")
}
if *pitID == "" {
// 首次调用:自动打开 PIT
openResp, err := e.builder.data.ElasticSearch.OpenPointInTime(e.index).
KeepAlive(e.pitKeepAliveString()).Do(ctx)
if err != nil {
return nil, nil, 0, false, err
}
*pitID = openResp.Id
openedPITByThisCall = true
}
// 使用 PIT 查询时不指定索引名(PIT 已绑定索引)
searchService = searchService.PointInTime(
elastic.NewPointInTimeWithKeepAlive(*pitID, e.pitKeepAliveString()),
)
} else {
searchService = searchService.Index(e.index)
}关键设计:
openedPITByThisCall这个标记比较巧妙——如果是本次调用打开的 PIT,但后续查询挂了,就需要主动关掉避免 ES 资源泄漏- PIT 查询时不指定索引名,因为 PIT 本身已经绑定了索引
hasMore 探测与结果裁剪
hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize
effectiveHits := hits
if hasMore {
effectiveHits = hits[:batchSize] // 裁掉多取的那 1 条
}
list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
var item R
if err := json.Unmarshal(hit.Source, &item); err != nil {
return nil, nil, 0, false, err
}
list = append(list, &item)
}这是个经典的"多取一条"探测技巧:请求 batchSize + 1 条,如果真的返回了这么多,说明后面还有数据;否则就是最后一页了。返回给调用方时把多取的那条裁掉就行。
最后一页自动关闭 PIT
if forcePIT && !hasMore && *pitID != "" {
e.closePIT(*pitID)
*pitID = ""
}在 forcePIT 模式下,检测到最后一页就自动关闭 PIT。这样即使调用方忘了手动关闭,也不会一直占着 ES 资源。
4.4 新增 closePIT:PIT 资源释放
这个方法比较简单,就是在 PIT 不再需要时主动释放 ES 资源:
const esPITCloseTimeout = 3 * time.Second
func (e *ElasticSearchBuilder[R]) closePIT(pitID string) {
if pitID == "" {
return
}
closeCtx, cancel := context.WithTimeout(context.Background(), esPITCloseTimeout)
defer cancel()
_, _ = e.builder.data.ElasticSearch.ClosePointInTime(pitID).Do(closeCtx)
}几个细节:超时时间用常量提出来方便调整;用 context.Background() 而不是调用方的 ctx,避免调用方 cancel 了导致 PIT 关不掉;关闭错误直接忽略,因为 PIT 关闭失败不应该影响业务,ES 会在 keep_alive 过期后自动回收。
closePIT 的调用时机由上层控制:QueryCursor 通过 defer 在迭代结束后关闭,doCursorQuery 在最后一页自动关闭,出错时通过 openedPITByThisCall 标记回滚。
4.5 QueryCursor 的重构
旧版 QueryCursor 很简洁——doCursorQuery 的签名刚好匹配回调类型,直接传进去就行:
// 旧版:签名匹配,直接传入
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
// ...
return e.builder.executeCursorWithMiddlewares(ctx, e.doCursorQuery)
}但现在 doCursorQuery 多了 forcePIT、pitID、hasMore 这些参数和返回值,签名对不上了,得包一层闭包来适配。同时还要管理 PIT 的生命周期:
// 新版:包装适配 + PIT 生命周期管理
func (e *ElasticSearchBuilder[R]) QueryCursor(ctx context.Context) iter.Seq2[*R, error] {
// ...
var pitID string
wrappedCursorQuery := func(ctx context.Context, cursorValues []any, isFirstBatch bool) ([]*R, []any, int64, error) {
// forcePIT = false:由 needPagination 控制是否启用 PIT
list, nextCursorValues, total, _, err := e.doCursorQuery(ctx, cursorValues, isFirstBatch, false, &pitID)
return list, nextCursorValues, total, err
}
innerIter := e.builder.executeCursorWithMiddlewares(ctx, wrappedCursorQuery)
return func(yield func(*R, error) bool) {
defer func() {
e.closePIT(pitID) // 迭代结束后关闭 PIT
}()
for item, err := range innerIter {
if !yield(item, err) {
return
}
}
}
}几个值得注意的点:
wrappedCursorQuery闭包里传forcePIT = false,让usePIT的决定权交给needPaginationhasMore返回值直接_丢掉——全量遍历模式下不需要这个,"本批次有没有数据"就够判断了- 外层套了个
defer e.closePIT(pitID),不管是正常遍历完还是中途break,PIT 都能被正确释放
4.6 结果解析的重构
旧版把搜索和 JSON 解析揉在同一个并行回调里:
// 旧版:在搜索回调内部解析
if err := util.WaitAndGo(func() error {
searchResult, err = searchService.Do(ctx)
if err != nil {
return err
}
// 解析与搜索耦合在同一个回调中
for _, hit := range searchResult.Hits.Hits {
var item R
if err := json.Unmarshal(hit.Source, &item); err != nil {
return err
}
list = append(list, &item)
}
return nil
}, func() error {
// Count 查询...
})新版把这两步拆开了——回调里只管搜索和更新 PIT ID,hits 的解析放到并行任务跑完之后统一处理:
// 新版:搜索与解析分离
if err = util.WaitAndGo(func() error {
searchResult, err = searchService.Do(ctx)
if err != nil {
return err
}
// 只更新 PIT ID,不解析 hits
if usePIT && *pitID != "" && searchResult.PitId != "" {
*pitID = searchResult.PitId
}
return nil
}, func() error {
// Count 查询...
})
// 并行任务完成后,统一处理 hits
hits := searchResult.Hits.Hits
hasMore = forcePIT && len(hits) > batchSize
effectiveHits := hits
if hasMore {
effectiveHits = hits[:batchSize] // 裁掉探测用的多余 1 条
}
list = make([]*R, 0, len(effectiveHits))
for _, hit := range effectiveHits {
var item R
if err := json.Unmarshal(hit.Source, &item); err != nil {
return nil, nil, 0, false, err
}
list = append(list, &item)
}为什么要拆?因为现在有 hasMore 探测逻辑——需要先看 hits 数量决定裁不裁剪,再去解析。如果还是老写法,就得先解析完再丢掉多余的那条,白白浪费一次反序列化。
另外还顺手修了个 Count 查询的触发条件。旧版多了个 e.afterHook == nil 的判断:
// 旧版
if !isFirstBatch || !e.builder.needTotal || e.afterHook == nil {
return nil
}新版移除了这个条件:
// 新版
if !isFirstBatch || !e.builder.needTotal {
return nil
}这个条件其实不合理——needTotal 本身就是"要不要查总数"的开关,跟有没有设 AfterQueryHook 没关系。用户说要总数,那就该查,不管有没有 hook。
4.7 Clone 的完善
既然加了新字段,Clone 方法自然也得跟着拷贝:
func (e *ElasticSearchBuilder[R]) Clone() *ElasticSearchBuilder[R] {
cloned := &ElasticSearchBuilder[R]{
index: e.index,
filter: e.filter,
pitKeepAlive: e.pitKeepAlive, // 新增:拷贝 PIT 保活时长
pitID: e.pitID, // 新增:拷贝 PIT ID
}
// ...
}4.8 完整调用链路图
最后用一张图把整条链路串起来,方便对照代码理解:
┌─────────────────────────────────────────────────────────────────┐
│ 业务层 │
│ │
│ 场景A: 全量遍历(数据导出) 场景B: 跨请求分页(API 翻页) │
│ ┌──────────────────────┐ ┌──────────────────────────┐ │
│ │ b.SetNeedPagination │ │ b.SetPITID(prevPitID) │ │
│ │ (false) │ │ b.SetCursorValue(...) │ │
│ │ b.QueryCursor(ctx) │ │ b.QueryPageWithPIT(ctx) │ │
│ └──────────┬───────────┘ └──────────┬───────────────┘ │
└─────────────┼──────────────────────────────────┼────────────────┘
│ │
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────────────┐
│ QueryCursor │ │ QueryPageWithPIT │
│ │ │ │
│ forcePIT = false │ │ forcePIT = true │
│ usePIT = !needPaginate │ │ usePIT = true (强制) │
│ hasMore: 忽略 │ │ hasMore: 多取1条探测 │
│ PIT关闭: defer │ │ PIT关闭: 最后一页自动关闭 │
└──────────┬──────────────┘ └──────────┬───────────────────────┘
│ │
└──────────┬───────────────────┘
▼
┌────────────────────────┐
│ doCursorQuery │
│ │
│ 1. 按需打开 PIT │
│ 2. 构建 search_after │
│ 3. 并行: 查询 + Count │
│ 4. 解析 hits │
│ 5. 提取 sort values │
│ 6. 按需关闭 PIT │
└────────────────────────┘五、实际怎么用
5.1 全量遍历(数据导出场景)
典型的数据导出需求,把索引里符合条件的文档全部遍历一遍:
b := builder.NewElasticSearchBuilder[Doc](
builder.NewDBProxy(nil, nil, esClient), "my_index",
)
b.SetFilter(elastic.NewTermQuery("status", "active"))
b.SetCursorField("created_at")
b.SetSort(elastic.NewFieldSort("_id").Asc())
b.SetLimit(500)
b.SetNeedPagination(false) // 关闭分页 → 自动启用 PIT
b.SetPitKeepAlive(2 * time.Minute) // PIT 保活时长
for doc, err := range b.QueryCursor(ctx) {
if err != nil {
log.Printf("遍历错误: %v", err)
break
}
export(doc)
}
// PIT 在迭代结束后自动关闭(defer)不需要手动管理 PIT 的生命周期,QueryCursor 内部会在迭代结束(包括 break 提前退出)时自动关闭。
5.2 跨请求分页(API 翻页场景)
这是更常见的场景——前端点「下一页」,后端需要接着上次的位置继续查:
// 首页请求
es := builder.NewElasticSearchBuilder[Doc](
builder.NewDBProxy(nil, nil, esClient), "my_index",
)
es.SetFilter(elastic.NewMatchAllQuery()).
SetCursorField("created_at", "id").
SetLimit(20)
page, err := es.QueryPageWithPIT(ctx)
// 持久化 page.PitID + page.CursorValues → 编码为 page_token 返回给客户端
// 下一页请求:从 page_token 中解码出 pitID 和 cursorValues
es.SetPITID(prevPitID).SetCursorValue(prevCursorValues...)
page, err = es.QueryPageWithPIT(ctx)
// 最后一页:page.HasMore == false, page.PitID == ""
// PIT 已自动关闭,无需手动处理后端 API 业务层数据组装参考
实际项目中,QueryPageWithPIT 返回的 ESPITPageResult 还需要包装成前后端约定的分页协议才能用。下面是一个比较通用的接口契约设计:
请求参数:
| 字段 | 类型 | 说明 |
|---|---|---|
page_size | integer | 每页条数 |
page_token | string | 不透明的分页令牌(首页为空) |
响应参数:
| 字段 | 类型 | 说明 |
|---|---|---|
items | array | 当前页数据 |
next_page_token | string | 下一页令牌(无更多数据时为空) |
has_more | boolean | 是否还有下一页 |
page_token 生成策略
page_token 说白了就是把 pit_id + cursor_values 序列化打包成一个不透明字符串。具体流程:
- 构建载荷:
{"pit_id":"...","cursor_values":[...],"exp":...,"v":1} - JSON 序列化后做 Base64URL 编码
- 如果有安全需求,可以加 HMAC 签名或 AES-GCM 加密
- 每次请求解码
page_token后,校验版本号、过期时间、签名,通过后再调用SetPITID+SetCursorValue
// page_token 载荷结构示例
type PageToken struct {
PitID string `json:"pit_id"`
CursorValues []any `json:"cursor_values"`
Exp int64 `json:"exp"` // 过期时间戳
Version int `json:"v"` // 版本号,用于兼容性控制
}
// 编码:ESPITPageResult → page_token
func EncodePageToken(result *builder.ESPITPageResult[Doc]) string {
if !result.HasMore {
return "" // 最后一页,无需生成 token
}
token := PageToken{
PitID: result.PitID,
CursorValues: result.CursorValues,
Exp: time.Now().Add(5 * time.Minute).Unix(),
Version: 1,
}
data, _ := json.Marshal(token)
// 实际生产中应添加 HMAC 签名或加密
return base64.RawURLEncoding.EncodeToString(data)
}
// 解码:page_token → pitID + cursorValues
func DecodePageToken(tokenStr string) (*PageToken, error) {
data, err := base64.RawURLEncoding.DecodeString(tokenStr)
if err != nil {
return nil, fmt.Errorf("invalid page token: %w", err)
}
var token PageToken
if err := json.Unmarshal(data, &token); err != nil {
return nil, fmt.Errorf("invalid page token payload: %w", err)
}
// 校验版本号和过期时间
if token.Version != 1 {
return nil, fmt.Errorf("unsupported page token version: %d", token.Version)
}
if time.Now().Unix() > token.Exp {
return nil, fmt.Errorf("page token expired")
}
return &token, nil
}几个实践中容易踩坑的点:
- PIT 有
keep_alive窗口,过期了就废了。如果用户隔了很久才翻下一页,token 解码后发现 PIT 已失效,老老实实从首页重新来一遍 - 排序键要稳定(业务时间戳 + 唯一 ID 是比较靠谱的组合),不然
search_after的结果本身就不确定 HasMore是通过limit+1探测出来的,作为前端展示「查看更多」的依据足够了
六、总结
把三种方案放在一起对比一下:
| 方案 | 一致性 | 深度分页 | 资源开销 | 适用场景 |
|---|---|---|---|---|
from + size | ❌ 不一致 | ❌ 有上限 | 高(越深越慢) | 前几页的简单分页 |
search_after | ❌ 不一致 | ✅ 无上限 | 低(恒定) | 单次请求内的游标遍历 |
search_after + PIT | ✅ 一致 | ✅ 无上限 | 低(恒定 + PIT 快照) | 跨请求分页、数据导出 |
search_after + PIT 方案已经在 QueryBuilder 中落地,对外暴露两个入口:
QueryCursor:全量遍历迭代器,SetNeedPagination(false)时自动启用 PIT,适合数据导出等批处理场景QueryPageWithPIT:单批次 PIT 分页查询,返回ESPITPageResult(含PitID、CursorValues、HasMore),适合跨请求的 API 翻页
实现上几个值得一提的设计选择:
- 通过
forcePIT参数区分「自动 PIT」和「强制 PIT」两种场景,底层复用同一个doCursorQuery引擎 QueryPageWithPIT用batchSize + 1的方式探测hasMore,省掉了一次额外的 Count 查询- PIT 的生命周期做了三重保障:出错时回滚、最后一页自动关闭、迭代结束时 defer 兜底
QueryPageWithPIT走executeWithMiddlewares管道,日志、指标、缓存等中间件照常生效
简单说:search_after 解决了深度分页的性能问题,PIT 解决了跨请求的一致性问题。两者结合起来,基本上就是 ES 分页的最优解了。