这次我们将搭建一个 ElasticSearch 搜索引擎,并使用阿里开源的 Canal 中间件实现 MySQL 中用户数据的实时同步。
我们知道 MySQL 的主备复制流程如下:
- master 将数据变更写入二进制日志(binary log),记录为二进制日志事件(binary log events),可通过 show binlog events 查看。
- slave 将master 的二进制日志事件复制到中继日志(relay log)。
- slave 重放中继日志中的事件,将数据变更反映到自己的数据中。
Canal [kə’næl],译为水道/管道/沟渠,主要用于基于 MySQL 数据库的增量日志解析,提供增量数据订阅和消费,不支持全量数据同步。由于它采用了 Binlog 机制,因此 MySQL 的新增、更新、删除操作对应的 ElasticSearch 都能实时响应。
Canal 主要实现以下几项任务:
- 模拟 MySQL slave 的交互协议,伪装为 MySQL slave ,向 MySQL master 发送 dump 协议。
- MySQL master 收到 dump 请求后,开始向 slave (即 Canal)推送 binary log。
- Canal 解析 binary log 对象(原始为 byte 流,可转换为 protocol 进行处理)。
接下来,我们通过一个 docker-compose.yaml 文件来搭建 MySQL 8.0.36、Elasticsearch 7.17.20 和 Canal 1.1.7。
version: '3.8'
services:
mysql:
container_name: canal-demo-mysql
image: mysql:8.0.36
restart: always
environment:
MYSQL_ROOT_PASSWORD: 123456
ports:
- "3306:3306"
networks:
- canal-demo
elasticsearch:
container_name: canal-demo-elasticsearch
image: elasticsearch:7.17.20
restart: always
environment:
- "discovery.type=single-node"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
- "9300:9300"
networks:
- canal-demo
canal:
container_name: canal-demo-canal
image: canal/canal-server:v1.1.7
restart: always
ports:
- "11111:11111"
environment:
- "CANAL_ADMIN_MANAGER=admin"
- "CANAL_ADMIN_PORT=11111"
- "CANAL_USER=admin"
- "CANAL_PASS=admin"
networks:
- canal-demo
networks:
canal-demo:
driver: bridge
执行 docker-compose up -d
启动容器编排。接下来进入 MySQL 容器,创建数据库和用户表,并给 name 字段添加索引以提高查询效率。
docker exec -it mysql容器ID mysql -uroot -p
# 输入密码 123456 进入 MySQL 命令行
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE `users` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`birthday` date NOT NULL,
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
INDEX `idx_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
退出容器后,为 ElasticSearch 创建一个同步的目标 mapping,结构与 MySQL 中的 users 表保持一致即可。由于 ElasticSearch 支持 RESTful API,你可以使用 Postman 或其他工具进行操作,这里演示通过 curl 直接写入。
docker exec -it ElasticSearch容器ID /bin/bash
curl -X PUT "http://localhost:9200/users" -H 'Content-Type: application/json' -d'
{
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "text",
"analyzer": "standard"
},
"birthday": {
"type": "date",
"format": "yyyy-MM-dd"
},
"created_at": {
"type": "date"
},
"updated_at": {
"type": "date"
}
}
}
}
'
接下来,进入 Canal 容器调整 MySQL 监听配置。
docker exec -it Canal容器ID /bin/bash
# 因为 Canal 搭建后默认的 destination 为 example
cd canal-server/conf/example/
# instance.properties 则为 destination 的配置
vi instance.properties
编辑 instance.properties 配置文件,主要修改三个地方:
# 这里的 127.0.0.1 替换成 MySQL 容器名
canal.instance.master.address=canal-demo-mysql:3306
# MySQL 账号
canal.instance.dbUsername=root
# MySQL 密码
canal.instance.dbPassword=123456
保存文件后重启 Canal 容器。至此,Canal 监听工作、MySQL 和 ElasticSearch 环境搭建已完成,接下来我们用 Go 编写同步处理逻辑。
mkdir canal-demo
cd canal-demo
go mod init
go get -u github.com/olivere/elastic/v7
go get -u github.com/withlin/canal-go
go get -u github.com/golang/protobuf/proto
编写 main.go 处理 Canal 监听及服务:
package main
import (
"canal-demo/controller"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/withlin/canal-go/client"
"net/http"
"time"
)
func main() {
// 初始化Elasticsearch客户端
es, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://127.0.0.1:9200"), elastic.SetBasicAuth("", ""))
if err != nil {
fmt.Println("Error elastic connect: ", err)
return
}
// 启动Canal监听MySQL数据库变更
canalClient := client.NewSimpleCanalConnector("127.0.0.1", 11111, "admin", "admin", "example", 60000, 60*60*1000)
if err = canalClient.Connect(); err != nil {
fmt.Println("Error canal connect: ", err)
return
}
if err = canalClient.Subscribe("mydb\\.users"); err != nil {
fmt.Println("Error canal subscribe: ", err)
return
}
// 处理Canal监听到的数据变更事件
go func() {
for {
message, err := canalClient.Get(100, nil, nil)
if err != nil {
fmt.Println("Error canal get message: ", err)
return
}
if message.Id == -1 || len(message.Entries) <= 0 {
// Canal的监听间隔,时间可自由设定
time.Sleep(1 * time.Second)
continue
}
if err = controller.HandleMessage(message, es); err != nil {
fmt.Println("Error handle message: ", err)
return
}
}
}()
// 这里使用net/http包来模拟后端服务中包含Canal监听程序的情况
// 如果只是简单的做为一个Canal监听程序,直接挂起主进程即可
if err := http.ListenAndServe(":8080", nil); err != nil {
fmt.Println("Error starting server: ", err)
return
}
}
接下来,我们需要完善处理 Canal 监听到的数据变更事件的逻辑。
package controller
import (
"canal-demo/utils"
"fmt"
"github.com/golang/protobuf/proto"
elastic "github.com/olivere/elastic/v7"
"github.com/withlin/canal-go/protocol"
pe "github.com/withlin/canal-go/protocol/entry"
"reflect"
)
// User 用户数据
type User struct {
Id uint64 `json:"id"`
Name string `json:"name"`
Birthday string `json:"birthday"`
CreateAt string `json:"create_at"`
UpdateAt string `json:"update_at"`
}
// BinLogEventTypeHandler 同步策略执行者
type BinLogEventTypeHandler interface {
Handle(current, before User) error
}
// UserIndexName index名称
const UserIndexName = "users"
// HandleMessage 处理Canal Message
func HandleMessage(message *protocol.Message, client *elastic.Client) error {
for _, entry := range message.Entries {
// 如果是事务操作则跳过
if entry.GetEntryType() == pe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pe.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(pe.RowChange)
if err := proto.Unmarshal(entry.GetStoreValue(), rowChange); err != nil {
return err
}
eventType := rowChange.GetEventType()
tableName := entry.Header.TableName
for _, rowData := range rowChange.GetRowDatas() {
switch tableName {
case "users":
var current, before User
// 解析变更前及变更后数据
reflectParseColumn(¤t, rowData.GetAfterColumns())
reflectParseColumn(&before, rowData.GetBeforeColumns())
// 根据对应策略,选择同步执行逻辑
handler := map[pe.EventType]BinLogEventTypeHandler{
pe.EventType_INSERT: NewBinLogInsertHandler(client),
pe.EventType_UPDATE: NewBinLogUpdateHandler(client),
pe.EventType_DELETE: NewBinLogDeleteHandler(client),
}[eventType]
err := handler.Handle(current, before)
if err != nil {
return err
}
}
}
}
return nil
}
// reflectParseColumn 反射解析Column
func reflectParseColumn[T User](t *T, columns []*pe.Column) {
for _, column := range columns {
err := utils.SetFieldByTag(t, "json", column.Name, column.Value)
if err != nil {
typ := reflect.TypeOf(t)
fmt.Printf("SetFieldByTag type: %v, name: %s, value: %s, err: %v \n", typ, column.Name, column.Value, err)
}
}
}
这里使用到了反射来处理结构体的 tag 信息获取并映射,保持数据同步的准确性:
package utils
import (
"fmt"
"reflect"
"strconv"
)
func SetFieldByTag(obj interface{}, tagName, fieldName, fieldValue string) error {
// 获取对象的反射值和类型
val := reflect.ValueOf(obj).Elem()
typ := val.Type()
// 查找字段及其tag
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
tagValue := field.Tag.Get(tagName)
// 如果tag与给定的字段名匹配,则尝试设置该字段的值
if tagValue == fieldName {
fieldVal := val.Field(i)
// 检查字段是否可设置
if !fieldVal.CanSet() {
return fmt.Errorf("field '%s' is not settable", field.Name)
}
// 根据字段类型转换并设置值
switch field.Type.Kind() {
case reflect.String:
fieldVal.SetString(fieldValue)
case reflect.Float32, reflect.Float64:
floatVal, _ := strconv.ParseFloat(fieldValue, 64)
fieldVal.SetFloat(floatVal)
case reflect.Int, reflect.Int16, reflect.Int32, reflect.Int64:
intVal, _ := strconv.ParseInt(fieldValue, 10, 64)
fieldVal.SetInt(intVal)
case reflect.Uint, reflect.Uint16, reflect.Uint32, reflect.Uint64:
intVal, _ := strconv.ParseUint(fieldValue, 10, 64)
fieldVal.SetUint(intVal)
case reflect.Bool:
boolVal, _ := strconv.ParseBool(fieldValue)
fieldVal.SetBool(boolVal)
default:
return fmt.Errorf("unsupported field type: %v", field.Type.Kind())
}
return nil // 设置成功,返回nil
}
}
return nil
}
接下来,编写具体的新增、更新、删除的同步处理逻辑。
首先是新增:
package controller
import (
"context"
elastic "github.com/olivere/elastic/v7"
"strconv"
)
type BinLogInsertHandler struct {
client *elastic.Client
}
func NewBinLogInsertHandler(client *elastic.Client) *BinLogInsertHandler {
return &BinLogInsertHandler{
client: client,
}
}
func (handler *BinLogInsertHandler) Handle(current, before User) error {
user := User{
Id: current.Id,
Name: current.Name,
Birthday: current.Birthday,
CreateAt: current.CreateAt,
UpdateAt: current.UpdateAt,
}
_, err := handler.client.Index().Index(UserIndexName).
BodyJson(user).
Id(strconv.FormatInt(int64(current.Id), 10)).
Do(context.TODO())
return err
}
然后是更新:
package controller
import (
"context"
"fmt"
elastic "github.com/olivere/elastic/v7"
)
type BinLogUpdateHandler struct {
client *elastic.Client
}
func NewBinLogUpdateHandler(client *elastic.Client) *BinLogUpdateHandler {
return &BinLogUpdateHandler{
client: client,
}
}
func (handler *BinLogUpdateHandler) Handle(current, before User) error {
var err error
if current.Name != before.Name {
_, err = handler.client.UpdateByQuery(UserIndexName).
Query(elastic.NewTermQuery("id", current.Id)).
Script(elastic.NewScriptInline(fmt.Sprintf("ctx._source.username = '%s';", current.Name))).
Do(context.TODO())
}
if current.Birthday != before.Birthday {
_, err = handler.client.UpdateByQuery(UserIndexName).
Query(elastic.NewTermQuery("id", current.Id)).
Script(elastic.NewScriptInline(fmt.Sprintf("ctx._source.birthday = '%s';", current.Birthday))).
Do(context.TODO())
}
return err
}
最后是删除:
package controller
import (
"context"
"errors"
elastic "github.com/olivere/elastic/v7"
"strconv"
)
type BinLogDeleteHandler struct {
client *elastic.Client
}
func NewBinLogDeleteHandler(client *elastic.Client) *BinLogDeleteHandler {
return &BinLogDeleteHandler{
client: client,
}
}
func (handler *BinLogDeleteHandler) Handle(current, before User) error {
_, err := handler.client.Delete().Index(UserIndexName).
// 注意这里得判断before的ID
Id(strconv.FormatInt(int64(before.Id), 10)).
Do(context.TODO())
var t *elastic.Error
switch {
case errors.As(err, &t):
if t.Status == 404 {
return nil
}
}
return err
}
启动新写的 gin 服务,如果此时查看 Canal 中 example 的执行日志,会显示出
2024-04-30 23:40:21.634 [canal-instance-scan-0] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
。。。
2024-04-30 23:45:33.582 [New I/O server worker #1-1] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to mydb\.users
2024-04-30 23:45:33.610 [New I/O server worker #1-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^mydb\.users$
表示 Canal 监听已经启动成功,然后我们分别对 users 表进行新增、更新、删除操作,再查看 ElasticSearch 中是否已做同步操作。
-- MySQL 容器中命令行执行
INSERT INTO `users`(`name`,`birthday`) VALUES ('test','1999-2-15');
# ElasticSearch 容器中执行
curl -X GET "http://localhost:9200/users/_search" -H "Content-Type: application/json" -d '{
"query": {
"match_all": {}
}
}'
// ElasticSearch 容器中返回结果
{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"users","_type":"_doc","_id":"1","_score":1.0,"_source":{"id":1,"name":"test","birthday":"1999-02-15","create_at":"","update_at":""}}]}}
-- MySQL 容器中命令行执行
UPDATE `users` SET `birthday` = '2000-2-25' WHERE `name` = 'test';
# ElasticSearch 容器中执行
curl -X GET "http://localhost:9200/users/_search" -H "Content-Type: application/json" -d '{
"query": {
"match_all": {}
}
}'
// ElasticSearch 容器中返回结果
{"took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"users","_type":"_doc","_id":"1","_score":1.0,"_source":{"birthday":"2000-02-25","name":"test","update_at":"","id":1,"create_at":""}}]}}
-- MySQL 容器中命令行执行
DELETE FROM `users` WHERE `id` = 1;
# ElasticSearch 容器中执行
curl -X GET "http://localhost:9200/users/_search" -H "Content-Type: application/json" -d '{
"query": {
"match_all": {}
}
}'
// ElasticSearch 容器中返回结果
{"took":1,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}
可以看到已经达到了同步的效果,至此你的 Go 应用就可以监听 MySQL 的变更,并实时同步到 ElasticSearch 中。