【Elasticsearch】Elasticsearch 与数据库实时同步方案

小破孩
2025-11-14 / 0 评论 / 3 阅读 / 正在检测是否收录...

在宝塔环境中实现 Elasticsearch 与数据库的实时同步,主要有以下几种方案:

一、同步方案概览

方案实时性复杂度数据一致性适用场景
应用层双写最高中等最终一致新建项目,可修改代码
Logstash JDBC分钟级延迟一致已有项目,增量同步
Canal秒级最终一致MySQL 数据库,要求高实时性
数据库触发器实时强一致数据库层面同步

二、方案一:应用层双写(推荐)

实现原理

在业务代码中同时写入数据库和 Elasticsearch。

PHP 实现示例

<?php
/**
 * 应用层双写方案
 * 在写入数据库的同时写入 Elasticsearch
 */

require '/www/wwwroot/your-site/vendor/autoload.php';

use Elasticsearch\ClientBuilder;

class DualWriteService {
    private $esClient;
    private $db;
    
    public function __construct() {
        // 初始化 ES 客户端
        $this->esClient = ClientBuilder::create()
            ->setHosts(['localhost:9200'])
            ->build();
            
        // 初始化数据库连接
        $this->initDatabase();
    }
    
    private function initDatabase() {
        $host = 'localhost';
        $dbname = 'your_database';
        $username = 'your_username';
        $password = 'your_password';
        
        try {
            $this->db = new PDO(
                "mysql:host={$host};dbname={$dbname};charset=utf8mb4",
                $username,
                $password,
                [
                    PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
                    PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC
                ]
            );
        } catch (PDOException $e) {
            throw new Exception("数据库连接失败: " . $e->getMessage());
        }
    }
    
    /**
     * 添加文章(双写)
     */
    public function addArticle($articleData) {
        $dbResult = $this->writeToDatabase($articleData);
        
        if ($dbResult['success']) {
            $esResult = $this->writeToElasticsearch($articleData, $dbResult['id']);
            
            if (!$esResult['success']) {
                // ES 写入失败,记录日志或加入重试队列
                $this->logSyncFailure('add', $dbResult['id'], $articleData);
            }
            
            return $dbResult;
        }
        
        return $dbResult;
    }
    
    /**
     * 更新文章(双写)
     */
    public function updateArticle($id, $updateData) {
        $dbResult = $this->updateDatabase($id, $updateData);
        
        if ($dbResult['success']) {
            $esResult = $this->updateElasticsearch($id, $updateData);
            
            if (!$esResult['success']) {
                $this->logSyncFailure('update', $id, $updateData);
            }
        }
        
        return $dbResult;
    }
    
    /**
     * 删除文章(双写)
     */
    public function deleteArticle($id) {
        $dbResult = $this->deleteFromDatabase($id);
        
        if ($dbResult['success']) {
            $esResult = $this->deleteFromElasticsearch($id);
            
            if (!$esResult['success']) {
                $this->logSyncFailure('delete', $id);
            }
        }
        
        return $dbResult;
    }
    
    /**
     * 写入数据库
     */
    private function writeToDatabase($data) {
        try {
            $sql = "INSERT INTO articles (title, content, author, category, tags, status, created_at, updated_at) 
                    VALUES (:title, :content, :author, :category, :tags, :status, NOW(), NOW())";
            
            $stmt = $this->db->prepare($sql);
            $stmt->execute([
                ':title' => $data['title'],
                ':content' => $data['content'],
                ':author' => $data['author'],
                ':category' => $data['category'],
                ':tags' => is_array($data['tags']) ? implode(',', $data['tags']) : $data['tags'],
                ':status' => $data['status'] ?? 1
            ]);
            
            $id = $this->db->lastInsertId();
            
            return [
                'success' => true,
                'id' => $id
            ];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 写入 Elasticsearch
     */
    private function writeToElasticsearch($data, $id) {
        try {
            $esData = [
                'id' => (int)$id,
                'title' => $data['title'],
                'content' => $data['content'],
                'author' => $data['author'],
                'category' => $data['category'],
                'tags' => is_array($data['tags']) ? $data['tags'] : explode(',', $data['tags']),
                'status' => $data['status'] ?? 1,
                'created_at' => date('Y-m-d H:i:s'),
                'updated_at' => date('Y-m-d H:i:s')
            ];
            
            $params = [
                'index' => 'articles',
                'id' => $id,
                'body' => $esData
            ];
            
            $response = $this->esClient->index($params);
            
            return ['success' => true];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 更新数据库
     */
    private function updateDatabase($id, $data) {
        try {
            $sql = "UPDATE articles SET 
                    title = :title, 
                    content = :content, 
                    author = :author, 
                    category = :category, 
                    tags = :tags, 
                    status = :status, 
                    updated_at = NOW() 
                    WHERE id = :id";
            
            $stmt = $this->db->prepare($sql);
            $stmt->execute([
                ':title' => $data['title'],
                ':content' => $data['content'],
                ':author' => $data['author'],
                ':category' => $data['category'],
                ':tags' => is_array($data['tags']) ? implode(',', $data['tags']) : $data['tags'],
                ':status' => $data['status'] ?? 1,
                ':id' => $id
            ]);
            
            return ['success' => true];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 更新 Elasticsearch
     */
    private function updateElasticsearch($id, $data) {
        try {
            $updateData = [
                'title' => $data['title'],
                'content' => $data['content'],
                'author' => $data['author'],
                'category' => $data['category'],
                'tags' => is_array($data['tags']) ? $data['tags'] : explode(',', $data['tags']),
                'status' => $data['status'] ?? 1,
                'updated_at' => date('Y-m-d H:i:s')
            ];
            
            $params = [
                'index' => 'articles',
                'id' => $id,
                'body' => [
                    'doc' => $updateData
                ]
            ];
            
            $response = $this->esClient->update($params);
            
            return ['success' => true];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 从数据库删除
     */
    private function deleteFromDatabase($id) {
        try {
            $sql = "DELETE FROM articles WHERE id = :id";
            $stmt = $this->db->prepare($sql);
            $stmt->execute([':id' => $id]);
            
            return ['success' => true];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 从 Elasticsearch 删除
     */
    private function deleteFromElasticsearch($id) {
        try {
            $params = [
                'index' => 'articles',
                'id' => $id
            ];
            
            $response = $this->esClient->delete($params);
            
            return ['success' => true];
        } catch (Exception $e) {
            return [
                'success' => false,
                'error' => $e->getMessage()
            ];
        }
    }
    
    /**
     * 记录同步失败
     */
    private function logSyncFailure($operation, $id, $data = null) {
        $logData = [
            'timestamp' => date('Y-m-d H:i:s'),
            'operation' => $operation,
            'id' => $id,
            'data' => $data
        ];
        
        file_put_contents(
            '/www/wwwlogs/es_sync_failures.log',
            json_encode($logData) . "\n",
            FILE_APPEND
        );
    }
    
    /**
     * 重试失败的同步
     */
    public function retryFailedSyncs() {
        $logFile = '/www/wwwlogs/es_sync_failures.log';
        
        if (!file_exists($logFile)) {
            return ['success' => true, 'message' => '无失败记录'];
        }
        
        $logs = file($logFile, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
        $successCount = 0;
        $failCount = 0;
        
        foreach ($logs as $log) {
            $data = json_decode($log, true);
            
            try {
                switch ($data['operation']) {
                    case 'add':
                        $this->writeToElasticsearch($data['data'], $data['id']);
                        break;
                    case 'update':
                        $this->updateElasticsearch($data['id'], $data['data']);
                        break;
                    case 'delete':
                        $this->deleteFromElasticsearch($data['id']);
                        break;
                }
                $successCount++;
            } catch (Exception $e) {
                $failCount++;
            }
        }
        
        // 清空日志文件
        file_put_contents($logFile, '');
        
        return [
            'success' => true,
            'retried' => $successCount,
            'failed' => $failCount
        ];
    }
}

// 使用示例
$syncService = new DualWriteService();

// 添加文章
$article = [
    'title' => '测试文章标题',
    'content' => '这是文章内容...',
    'author' => '张三',
    'category' => '技术',
    'tags' => ['PHP', 'Elasticsearch', '搜索'],
    'status' => 1
];

$result = $syncService->addArticle($article);
if ($result['success']) {
    echo "文章添加成功,ID: " . $result['id'];
} else {
    echo "添加失败: " . $result['error'];
}
?>

三、方案二:Logstash JDBC 输入插件

安装和配置 Logstash

1. 在宝塔中安装 Logstash

# 进入宝塔终端
cd /www/server

# 下载 Logstash(版本需要与 ES 对应)
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.0-linux-x86_64.tar.gz

# 解压
tar -zxvf logstash-7.17.0-linux-x86_64.tar.gz
mv logstash-7.17.0 logstash

# 下载 MySQL JDBC 驱动
cd logstash
wget https://cdn.mysql.com/archives/mysql-connector-java-8.0/mysql-connector-java-8.0.28.tar.gz
tar -zxvf mysql-connector-java-8.0.28.tar.gz

2. 创建 Logstash 配置文件

创建 /www/server/logstash/config/mysql-sync.conf

input {
  jdbc {
    # MySQL 连接配置
    jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=utf8&useSSL=false"
    jdbc_user => "your_username"
    jdbc_password => "your_password"
    
    # JDBC 驱动路径
    jdbc_driver_library => "/www/server/logstash/mysql-connector-java-8.0.28/mysql-connector-java-8.0.28.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    
    # 启用分页
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
    
    # 调度配置(每分钟执行一次)
    schedule => "* * * * *"
    
    # SQL 查询(增量同步)
    statement => "SELECT * FROM articles WHERE updated_at > :sql_last_value OR created_at > :sql_last_value"
    
    # 记录最后一次同步时间
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "updated_at"
    
    # 记录文件位置
    last_run_metadata_path => "/www/server/logstash/last_run_metadata.txt"
    
    # 时区设置
    jdbc_default_timezone => "Asia/Shanghai"
  }
}

filter {
  # 添加 @timestamp 字段
  date {
    match => [ "updated_at", "yyyy-MM-dd HH:mm:ss" ]
    target => "@timestamp"
  }
  
  # 处理 tags 字段
  if [tags] {
    mutate {
      gsub => [ "tags", ",", "|" ]
      split => { "tags" => "|" }
    }
  }
  
  # 移除不需要的字段
  mutate {
    remove_field => ["@version", "@timestamp"]
  }
}

output {
  # 输出到 Elasticsearch
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "articles"
    document_id => "%{id}"
    
    # 定义映射模板
    template => "/www/server/logstash/config/articles-template.json"
    template_name => "articles"
    template_overwrite => true
  }
  
  # 可选:输出到文件用于调试
  file {
    path => "/www/server/logstash/logs/mysql-sync.log"
  }
}

3. 创建 Elasticsearch 映射模板

创建 /www/server/logstash/config/articles-template.json

{
  "index_patterns": ["articles*"],
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "analysis": {
      "analyzer": {
        "ik_smart_analyzer": {
          "type": "custom",
          "tokenizer": "ik_smart"
        },
        "ik_max_analyzer": {
          "type": "custom",
          "tokenizer": "ik_max_word"
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id": {"type": "integer"},
      "title": {
        "type": "text",
        "analyzer": "ik_smart_analyzer",
        "search_analyzer": "ik_smart_analyzer"
      },
      "content": {
        "type": "text",
        "analyzer": "ik_max_analyzer",
        "search_analyzer": "ik_smart_analyzer"
      },
      "author": {"type": "keyword"},
      "category": {"type": "keyword"},
      "tags": {"type": "keyword"},
      "status": {"type": "integer"},
      "created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
      "updated_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
    }
  }
}

4. 启动 Logstash

# 创建日志目录
mkdir -p /www/server/logstash/logs

# 启动 Logstash
cd /www/server/logstash
./bin/logstash -f config/mysql-sync.conf

# 或作为守护进程运行
nohup ./bin/logstash -f config/mysql-sync.conf > logs/logstash.log 2>&1 &

5. 创建宝塔计划任务

在宝塔面板中添加计划任务:

# 命令
cd /www/server/logstash && ./bin/logstash -f config/mysql-sync.conf

# 执行周期:每分钟

四、方案三:Canal(MySQL Binlog 同步)

安装和配置 Canal

1. 安装 Canal

# 下载 Canal
cd /www/server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz
mv canal.deployer-1.1.6 canal

2. 配置 MySQL 开启 Binlog

-- 检查是否开启 binlog
SHOW VARIABLES LIKE 'log_bin';

-- 如果没有开启,在 MySQL 配置文件中添加
-- /etc/my.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

3. 创建 Canal 用户

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 配置 Canal

编辑 /www/server/canal/conf/example/instance.properties

# 数据库配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal_password
canal.instance.connectionCharset=UTF-8

# 要监听的数据库和表
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=

# 表映射
canal.instance.filter.regex=your_database.articles

5. 创建 PHP Canal 客户端

<?php
/**
 * Canal PHP 客户端
 * 监听 MySQL Binlog 并同步到 Elasticsearch
 */

class CanalClient {
    private $esClient;
    
    public function __construct() {
        $this->esClient = ClientBuilder::create()
            ->setHosts(['localhost:9200'])
            ->build();
    }
    
    public function startSync() {
        // 连接 Canal 服务器
        $client = new \Canal\Client();
        $client->connect("127.0.0.1", 11111);
        $client->subscribe("example", "your_database.articles", "");
        
        while (true) {
            $message = $client->get(100);
            
            if ($message->getEntries()) {
                foreach ($message->getEntries() as $entry) {
                    if ($entry->getEntryType() == \Canal\Protocol\EntryType::ROWDATA) {
                        $this->processRowChange($entry);
                    }
                }
            }
            
            sleep(1); // 避免 CPU 过高
        }
    }
    
    private function processRowChange($entry) {
        $rowChange = \Canal\Protocol\RowChange::parseFromString($entry->getStoreValue());
        
        foreach ($rowChange->getRowDatasList() as $rowData) {
            switch ($rowChange->getEventType()) {
                case \Canal\Protocol\EventType::INSERT:
                    $this->handleInsert($rowData);
                    break;
                case \Canal\Protocol\EventType::UPDATE:
                    $this->handleUpdate($rowData);
                    break;
                case \Canal\Protocol\EventType::DELETE:
                    $this->handleDelete($rowData);
                    break;
            }
        }
    }
    
    private function handleInsert($rowData) {
        $afterColumns = $rowData->getAfterColumnsList();
        $data = [];
        
        foreach ($afterColumns as $column) {
            $data[$column->getName()] = $column->getValue();
        }
        
        $this->syncToElasticsearch('index', $data);
    }
    
    private function handleUpdate($rowData) {
        $afterColumns = $rowData->getAfterColumnsList();
        $data = [];
        $id = null;
        
        foreach ($afterColumns as $column) {
            $data[$column->getName()] = $column->getValue();
            if ($column->getName() == 'id') {
                $id = $column->getValue();
            }
        }
        
        $this->syncToElasticsearch('update', $data, $id);
    }
    
    private function handleDelete($rowData) {
        $beforeColumns = $rowData->getBeforeColumnsList();
        $id = null;
        
        foreach ($beforeColumns as $column) {
            if ($column->getName() == 'id') {
                $id = $column->getValue();
                break;
            }
        }
        
        $this->syncToElasticsearch('delete', null, $id);
    }
    
    private function syncToElasticsearch($operation, $data = null, $id = null) {
        try {
            switch ($operation) {
                case 'index':
                    $params = [
                        'index' => 'articles',
                        'id' => $data['id'],
                        'body' => $this->transformData($data)
                    ];
                    $this->esClient->index($params);
                    break;
                    
                case 'update':
                    $params = [
                        'index' => 'articles',
                        'id' => $id,
                        'body' => [
                            'doc' => $this->transformData($data)
                        ]
                    ];
                    $this->esClient->update($params);
                    break;
                    
                case 'delete':
                    $params = [
                        'index' => 'articles',
                        'id' => $id
                    ];
                    $this->esClient->delete($params);
                    break;
            }
            
            echo "同步成功: {$operation} ID: {$id}\n";
        } catch (Exception $e) {
            echo "同步失败: {$e->getMessage()}\n";
            $this->logFailure($operation, $id, $data);
        }
    }
    
    private function transformData($data) {
        return [
            'id' => (int)$data['id'],
            'title' => $data['title'],
            'content' => $data['content'],
            'author' => $data['author'],
            'category' => $data['category'],
            'tags' => !empty($data['tags']) ? explode(',', $data['tags']) : [],
            'status' => (int)$data['status'],
            'created_at' => $data['created_at'],
            'updated_at' => $data['updated_at']
        ];
    }
    
    private function logFailure($operation, $id, $data) {
        // 记录同步失败日志
        file_put_contents(
            '/www/wwwlogs/canal_sync_failures.log',
            json_encode([
                'timestamp' => date('Y-m-d H:i:s'),
                'operation' => $operation,
                'id' => $id,
                'data' => $data
            ]) . "\n",
            FILE_APPEND
        );
    }
}

// 启动 Canal 客户端
$canalClient = new CanalClient();
$canalClient->startSync();
?>

五、方案四:数据库触发器 + 消息队列

1. 创建消息表

CREATE TABLE es_sync_queue (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    record_id BIGINT NOT NULL,
    operation ENUM('INSERT', 'UPDATE', 'DELETE') NOT NULL,
    sync_status TINYINT DEFAULT 0 COMMENT '0:未同步, 1:已同步, 2:同步失败',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status (sync_status),
    INDEX idx_created (created_at)
);

2. 创建数据库触发器

-- INSERT 触发器
DELIMITER $$
CREATE TRIGGER after_article_insert
AFTER INSERT ON articles
FOR EACH ROW
BEGIN
    INSERT INTO es_sync_queue (table_name, record_id, operation, sync_status)
    VALUES ('articles', NEW.id, 'INSERT', 0);
END$$
DELIMITER ;

-- UPDATE 触发器
DELIMITER $$
CREATE TRIGGER after_article_update
AFTER UPDATE ON articles
FOR EACH ROW
BEGIN
    INSERT INTO es_sync_queue (table_name, record_id, operation, sync_status)
    VALUES ('articles', NEW.id, 'UPDATE', 0);
END$$
DELIMITER ;

-- DELETE 触发器
DELIMITER $$
CREATE TRIGGER after_article_delete
AFTER DELETE ON articles
FOR EACH ROW
BEGIN
    INSERT INTO es_sync_queue (table_name, record_id, operation, sync_status)
    VALUES ('articles', OLD.id, 'DELETE', 0);
END$$
DELIMITER ;

3. PHP 消息队列处理器

<?php
/**
 * 消息队列同步处理器
 */

class QueueSyncService {
    private $db;
    private $esClient;
    
    public function __construct() {
        $this->initDatabase();
        $this->esClient = ClientBuilder::create()
            ->setHosts(['localhost:9200'])
            ->build();
    }
    
    public function processQueue($batchSize = 100) {
        // 获取待同步的记录
        $sql = "SELECT * FROM es_sync_queue 
                WHERE sync_status = 0 
                ORDER BY id ASC 
                LIMIT :limit 
                FOR UPDATE";
        
        $stmt = $this->db->prepare($sql);
        $stmt->bindValue(':limit', $batchSize, PDO::PARAM_INT);
        $stmt->execute();
        
        $records = $stmt->fetchAll();
        
        foreach ($records as $record) {
            $this->processRecord($record);
        }
        
        return count($records);
    }
    
    private function processRecord($record) {
        try {
            switch ($record['operation']) {
                case 'INSERT':
                case 'UPDATE':
                    $this->syncUpsert($record);
                    break;
                case 'DELETE':
                    $this->syncDelete($record);
                    break;
            }
            
            // 标记为已同步
            $this->markAsSynced($record['id']);
            
        } catch (Exception $e) {
            // 标记为同步失败
            $this->markAsFailed($record['id'], $e->getMessage());
        }
    }
    
    private function syncUpsert($record) {
        // 从数据库获取最新数据
        $data = $this->getRecordData($record['table_name'], $record['record_id']);
        
        if ($data) {
            $params = [
                'index' => $record['table_name'],
                'id' => $record['record_id'],
                'body' => $this->transformData($data)
            ];
            
            $this->esClient->index($params);
        }
    }
    
    private function syncDelete($record) {
        $params = [
            'index' => $record['table_name'],
            'id' => $record['record_id']
        ];
        
        $this->esClient->delete($params);
    }
    
    private function getRecordData($tableName, $recordId) {
        $sql = "SELECT * FROM {$tableName} WHERE id = :id";
        $stmt = $this->db->prepare($sql);
        $stmt->execute([':id' => $recordId]);
        
        return $stmt->fetch();
    }
    
    private function transformData($data) {
        // 根据表结构转换数据
        if (isset($data['tags']) && !empty($data['tags'])) {
            $data['tags'] = explode(',', $data['tags']);
        }
        
        return $data;
    }
    
    private function markAsSynced($queueId) {
        $sql = "UPDATE es_sync_queue SET sync_status = 1 WHERE id = :id";
        $stmt = $this->db->prepare($sql);
        $stmt->execute([':id' => $queueId]);
    }
    
    private function markAsFailed($queueId, $error) {
        $sql = "UPDATE es_sync_queue SET sync_status = 2, error_message = :error WHERE id = :id";
        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            ':id' => $queueId,
            ':error' => $error
        ]);
    }
    
    public function retryFailed($batchSize = 50) {
        $sql = "SELECT * FROM es_sync_queue 
                WHERE sync_status = 2 
                ORDER BY id ASC 
                LIMIT :limit";
        
        $stmt = $this->db->prepare($sql);
        $stmt->bindValue(':limit', $batchSize, PDO::PARAM_INT);
        $stmt->execute();
        
        $records = $stmt->fetchAll();
        $successCount = 0;
        
        foreach ($records as $record) {
            try {
                $this->processRecord($record);
                $successCount++;
            } catch (Exception $e) {
                // 记录重试失败
            }
        }
        
        return $successCount;
    }
}

// 创建宝塔计划任务执行同步
$syncService = new QueueSyncService();

// 每次处理 100 条记录
$processed = $syncService->processQueue(100);
echo "已处理 {$processed} 条同步记录";

// 重试失败的记录
$retried = $syncService->retryFailed(50);
echo "重试成功 {$retried} 条记录";
?>

六、方案对比和选择建议

方案选择指南

场景推荐方案理由
新建项目应用层双写代码可控,实时性最好
已有项目,可修改代码应用层双写 + 全量初始化渐进式改造
MySQL 数据库,要求高实时性Canal基于 Binlog,对业务无侵入
简单同步,可接受分钟级延迟Logstash JDBC配置简单,稳定可靠
数据库层面同步触发器 + 消息队列强一致性,对应用透明

宝塔环境推荐配置

对于大多数宝塔用户,我推荐:

  1. 首选:应用层双写方案
  2. 备选:Logstash JDBC 方案
  3. 高级需求:Canal 方案

性能优化建议

// 批量操作优化
public function bulkSync($records) {
    $params = ['body' => []];
    
    foreach ($records as $record) {
        $params['body'][] = [
            'index' => [
                '_index' => 'articles',
                '_id' => $record['id']
            ]
        ];
        $params['body'][] = $this->transformData($record);
    }
    
    return $this->esClient->bulk($params);
}

选择最适合你项目需求和团队技术能力的方案。对于宝塔环境,应用层双写和 Logstash 方案相对更容易实施和维护。

0

评论 (0)

取消