这次我们将搭建一个 ElasticSearch 搜索引擎,并使用阿里开源的 Canal 中间件实现 MySQL 中用户数据的实时同步。

我们知道 MySQL 的主备复制流程如下:

  1. master 将数据变更写入二进制日志(binary log),记录为二进制日志事件(binary log events),可通过 show binlog events 查看。
  2. slave 将master 的二进制日志事件复制到中继日志(relay log)。
  3. slave 重放中继日志中的事件,将数据变更反映到自己的数据中。

MySQL主备流程图

Canal [kə’næl],译为水道/管道/沟渠,主要用于基于 MySQL 数据库的增量日志解析,提供增量数据订阅和消费,不支持全量数据同步。由于它采用了 Binlog 机制,因此 MySQL 的新增、更新、删除操作对应的 ElasticSearch 都能实时响应。

Canal示意图

Canal 主要实现以下几项任务:

  1. 模拟 MySQL slave 的交互协议,伪装为 MySQL slave ,向 MySQL master 发送 dump 协议。
  2. MySQL master 收到 dump 请求后,开始向 slave (即 Canal)推送 binary log。
  3. Canal 解析 binary log 对象(原始为 byte 流,可转换为 protocol 进行处理)。

接下来,我们通过一个 docker-compose.yaml 文件来搭建 MySQL 8.0.36、Elasticsearch 7.17.20 和 Canal 1.1.7。

version: '3.8'

services:
  mysql:
    container_name: canal-demo-mysql
    image: mysql:8.0.36
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: 123456
    ports:
      - "3306:3306"
    networks:
      - canal-demo

  elasticsearch:
    container_name: canal-demo-elasticsearch
    image: elasticsearch:7.17.20
    restart: always
    environment:
      - "discovery.type=single-node"
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
      - "9300:9300"
    networks:
      - canal-demo

  canal:
    container_name: canal-demo-canal
    image: canal/canal-server:v1.1.7
    restart: always
    ports:
      - "11111:11111"
    environment:
      - "CANAL_ADMIN_MANAGER=admin"
      - "CANAL_ADMIN_PORT=11111"
      - "CANAL_USER=admin"
      - "CANAL_PASS=admin"
    networks:
      - canal-demo

networks:
  canal-demo:
    driver: bridge

执行 docker-compose up -d 启动容器编排。接下来进入 MySQL 容器,创建数据库和用户表,并给 name 字段添加索引以提高查询效率。

docker exec -it mysql容器ID mysql -uroot -p
# 输入密码 123456 进入 MySQL 命令行
CREATE DATABASE mydb;
USE mydb;

CREATE TABLE `users` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
  `birthday` date NOT NULL,
  `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  INDEX `idx_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

退出容器后,为 ElasticSearch 创建一个同步的目标 mapping,结构与 MySQL 中的 users 表保持一致即可。由于 ElasticSearch 支持 RESTful API,你可以使用 Postman 或其他工具进行操作,这里演示通过 curl 直接写入。

docker exec -it ElasticSearch容器ID /bin/bash

curl -X PUT "http://localhost:9200/users" -H 'Content-Type: application/json' -d'
{
  "mappings": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "name": {
        "type": "text",
        "analyzer": "standard"
      },
      "birthday": {
        "type": "date",
        "format": "yyyy-MM-dd"
      },
      "created_at": {
        "type": "date"
      },
      "updated_at": {
        "type": "date"
      }
    }
  }
}
'

接下来,进入 Canal 容器调整 MySQL 监听配置。

docker exec -it Canal容器ID /bin/bash
# 因为 Canal 搭建后默认的 destination 为 example
cd canal-server/conf/example/
# instance.properties 则为 destination 的配置
vi instance.properties

编辑 instance.properties 配置文件,主要修改三个地方:

# 这里的 127.0.0.1 替换成 MySQL 容器名
canal.instance.master.address=canal-demo-mysql:3306
# MySQL 账号
canal.instance.dbUsername=root
# MySQL 密码
canal.instance.dbPassword=123456

保存文件后重启 Canal 容器。至此,Canal 监听工作、MySQL 和 ElasticSearch 环境搭建已完成,接下来我们用 Go 编写同步处理逻辑。

mkdir canal-demo
cd canal-demo
go mod init
go get -u github.com/olivere/elastic/v7
go get -u github.com/withlin/canal-go
go get -u github.com/golang/protobuf/proto

编写 main.go 处理 Canal 监听及服务:

package main

import (
    "canal-demo/controller"
    "fmt"
    "github.com/olivere/elastic/v7"
    "github.com/withlin/canal-go/client"
    "net/http"
    "time"
)

func main() {
    // 初始化Elasticsearch客户端
    es, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://127.0.0.1:9200"), elastic.SetBasicAuth("", ""))
    if err != nil {
        fmt.Println("Error elastic connect: ", err)
        return
    }

    // 启动Canal监听MySQL数据库变更
    canalClient := client.NewSimpleCanalConnector("127.0.0.1", 11111, "admin", "admin", "example", 60000, 60*60*1000)
    if err = canalClient.Connect(); err != nil {
        fmt.Println("Error canal connect: ", err)
        return
    }
    if err = canalClient.Subscribe("mydb\\.users"); err != nil {
        fmt.Println("Error canal subscribe: ", err)
        return
    }

    // 处理Canal监听到的数据变更事件
    go func() {
        for {
            message, err := canalClient.Get(100, nil, nil)
            if err != nil {
                fmt.Println("Error canal get message: ", err)
                return
            }

            if message.Id == -1 || len(message.Entries) <= 0 {
                // Canal的监听间隔,时间可自由设定
                time.Sleep(1 * time.Second)
                continue
            }

            if err = controller.HandleMessage(message, es); err != nil {
                fmt.Println("Error handle message: ", err)
                return
            }
        }
    }()

    // 这里使用net/http包来模拟后端服务中包含Canal监听程序的情况
    // 如果只是简单的做为一个Canal监听程序,直接挂起主进程即可
    if err := http.ListenAndServe(":8080", nil); err != nil {
        fmt.Println("Error starting server: ", err)
        return
    }
}

接下来,我们需要完善处理 Canal 监听到的数据变更事件的逻辑。

package controller

import (
    "canal-demo/utils"
    "fmt"
    "github.com/golang/protobuf/proto"
    elastic "github.com/olivere/elastic/v7"
    "github.com/withlin/canal-go/protocol"
    pe "github.com/withlin/canal-go/protocol/entry"
    "reflect"
)

// User 用户数据
type User struct {
    Id       uint64 `json:"id"`
    Name     string `json:"name"`
    Birthday string `json:"birthday"`
    CreateAt string `json:"create_at"`
    UpdateAt string `json:"update_at"`
}

// BinLogEventTypeHandler 同步策略执行者
type BinLogEventTypeHandler interface {
    Handle(current, before User) error
}

// UserIndexName index名称
const UserIndexName = "users"

// HandleMessage 处理Canal Message
func HandleMessage(message *protocol.Message, client *elastic.Client) error {
    for _, entry := range message.Entries {
        // 如果是事务操作则跳过
        if entry.GetEntryType() == pe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pe.EntryType_TRANSACTIONEND {
            continue
        }

        rowChange := new(pe.RowChange)
        if err := proto.Unmarshal(entry.GetStoreValue(), rowChange); err != nil {
            return err
        }

        eventType := rowChange.GetEventType()
        tableName := entry.Header.TableName
        for _, rowData := range rowChange.GetRowDatas() {
            switch tableName {
            case "users":
                var current, before User
                // 解析变更前及变更后数据
                reflectParseColumn(&current, rowData.GetAfterColumns())
                reflectParseColumn(&before, rowData.GetBeforeColumns())

                // 根据对应策略,选择同步执行逻辑
                handler := map[pe.EventType]BinLogEventTypeHandler{
                    pe.EventType_INSERT: NewBinLogInsertHandler(client),
                    pe.EventType_UPDATE: NewBinLogUpdateHandler(client),
                    pe.EventType_DELETE: NewBinLogDeleteHandler(client),
                }[eventType]
                err := handler.Handle(current, before)
                if err != nil {
                    return err
                }
            }
        }
    }

    return nil
}

// reflectParseColumn 反射解析Column
func reflectParseColumn[T User](t *T, columns []*pe.Column) {
    for _, column := range columns {
        err := utils.SetFieldByTag(t, "json", column.Name, column.Value)
        if err != nil {
            typ := reflect.TypeOf(t)
            fmt.Printf("SetFieldByTag type: %v, name: %s, value: %s, err: %v \n", typ, column.Name, column.Value, err)
        }
    }
}

这里使用到了反射来处理结构体的 tag 信息获取并映射,保持数据同步的准确性:

package utils

import (
    "fmt"
    "reflect"
    "strconv"
)

func SetFieldByTag(obj interface{}, tagName, fieldName, fieldValue string) error {
    // 获取对象的反射值和类型
    val := reflect.ValueOf(obj).Elem()
    typ := val.Type()

    // 查找字段及其tag
    for i := 0; i < typ.NumField(); i++ {
        field := typ.Field(i)
        tagValue := field.Tag.Get(tagName)

        // 如果tag与给定的字段名匹配,则尝试设置该字段的值
        if tagValue == fieldName {
            fieldVal := val.Field(i)

            // 检查字段是否可设置
            if !fieldVal.CanSet() {
                return fmt.Errorf("field '%s' is not settable", field.Name)
            }

            // 根据字段类型转换并设置值
            switch field.Type.Kind() {
            case reflect.String:
                fieldVal.SetString(fieldValue)
            case reflect.Float32, reflect.Float64:
                floatVal, _ := strconv.ParseFloat(fieldValue, 64)
                fieldVal.SetFloat(floatVal)
            case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64:
                intVal, _ := strconv.ParseInt(fieldValue, 10, 64)
                fieldVal.SetInt(intVal)
            case reflect.Uint, reflect.Uint16, reflect.Uint32, reflect.Uint64:
                intVal, _ := strconv.ParseUint(fieldValue, 10, 64)
                fieldVal.SetUint(intVal)
            case reflect.Bool:
                boolVal, _ := strconv.ParseBool(fieldValue)
                fieldVal.SetBool(boolVal)
            default:
                return fmt.Errorf("unsupported field type: %v", field.Type.Kind())
            }

            return nil // 设置成功,返回nil
        }
    }

    return nil
}

接下来,编写具体的新增、更新、删除的同步处理逻辑。

首先是新增:

package controller

import (
    "context"
    elastic "github.com/olivere/elastic/v7"
    "strconv"
)

type BinLogInsertHandler struct {
    client *elastic.Client
}

func NewBinLogInsertHandler(client *elastic.Client) *BinLogInsertHandler {
    return &BinLogInsertHandler{
        client: client,
    }
}

func (handler *BinLogInsertHandler) Handle(current, before User) error {
    user := User{
        Id:       current.Id,
        Name:     current.Name,
        Birthday: current.Birthday,
        CreateAt: current.CreateAt,
        UpdateAt: current.UpdateAt,
    }
    _, err := handler.client.Index().Index(UserIndexName).
        BodyJson(user).
        Id(strconv.FormatInt(int64(current.Id), 10)).
        Do(context.TODO())

    return err
}

然后是更新:

package controller

import (
    "context"
    "fmt"
    elastic "github.com/olivere/elastic/v7"
)

type BinLogUpdateHandler struct {
    client *elastic.Client
}

func NewBinLogUpdateHandler(client *elastic.Client) *BinLogUpdateHandler {
    return &BinLogUpdateHandler{
        client: client,
    }
}

func (handler *BinLogUpdateHandler) Handle(current, before User) error {
    var err error
    if current.Name != before.Name {
        _, err = handler.client.UpdateByQuery(UserIndexName).
            Query(elastic.NewTermQuery("id", current.Id)).
            Script(elastic.NewScriptInline(fmt.Sprintf("ctx._source.username = '%s';", current.Name))).
            Do(context.TODO())
    }

    if current.Birthday != before.Birthday {
        _, err = handler.client.UpdateByQuery(UserIndexName).
            Query(elastic.NewTermQuery("id", current.Id)).
            Script(elastic.NewScriptInline(fmt.Sprintf("ctx._source.birthday = '%s';", current.Birthday))).
            Do(context.TODO())
    }

    return err
}

最后是删除:

package controller

import (
    "context"
    "errors"
    elastic "github.com/olivere/elastic/v7"
    "strconv"
)

type BinLogDeleteHandler struct {
    client *elastic.Client
}

func NewBinLogDeleteHandler(client *elastic.Client) *BinLogDeleteHandler {
    return &BinLogDeleteHandler{
        client: client,
    }
}

func (handler *BinLogDeleteHandler) Handle(current, before User) error {
    _, err := handler.client.Delete().Index(UserIndexName).
        // 注意这里得判断before的ID
        Id(strconv.FormatInt(int64(before.Id), 10)).
        Do(context.TODO())

    var t *elastic.Error
    switch {
    case errors.As(err, &t):
        if t.Status == 404 {
            return nil
        }
    }
    return err
}

启动新写的 gin 服务,如果此时查看 Canal 中 example 的执行日志,会显示出

2024-04-30 23:40:21.634 [canal-instance-scan-0] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

。。。

2024-04-30 23:45:33.582 [New I/O server worker #1-1] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to mydb\.users
2024-04-30 23:45:33.610 [New I/O server worker #1-1] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^mydb\.users$

表示 Canal 监听已经启动成功,然后我们分别对 users 表进行新增、更新、删除操作,再查看 ElasticSearch 中是否已做同步操作。

-- MySQL 容器中命令行执行
INSERT INTO `users`(`name`,`birthday`) VALUES ('test','1999-2-15');
# ElasticSearch 容器中执行
curl -X GET "http://localhost:9200/users/_search" -H "Content-Type: application/json" -d '{
  "query": {
    "match_all": {}
  }
}'
// ElasticSearch 容器中返回结果
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"users","_type":"_doc","_id":"1","_score":1.0,"_source":{"id":1,"name":"test","birthday":"1999-02-15","create_at":"","update_at":""}}]}}
-- MySQL 容器中命令行执行
UPDATE `users` SET `birthday` = '2000-2-25' WHERE `name` = 'test';
# ElasticSearch 容器中执行
curl -X GET "http://localhost:9200/users/_search" -H "Content-Type: application/json" -d '{
  "query": {
    "match_all": {}
  }
}'
// ElasticSearch 容器中返回结果
{"took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"users","_type":"_doc","_id":"1","_score":1.0,"_source":{"birthday":"2000-02-25","name":"test","update_at":"","id":1,"create_at":""}}]}}
-- MySQL 容器中命令行执行
DELETE FROM `users` WHERE `id` = 1;
# ElasticSearch 容器中执行
curl -X GET "http://localhost:9200/users/_search" -H "Content-Type: application/json" -d '{
  "query": {
    "match_all": {}
  }
}'
// ElasticSearch 容器中返回结果
{"took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}

可以看到已经达到了同步的效果,至此你的 Go 应用就可以监听 MySQL 的变更,并实时同步到 ElasticSearch 中。