当前位置:首页 - Elastic

Centos下 用Logstash同步mysql数据到Es

作者:高景洋 日期:2021-05-13 00:19:32 浏览次数:1483

1、Logstash 下载地址

https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.zip  

2、解压LogStash

unzip logstash-6.4.0.zip

:如果未安装zip 解压工具,请执行 yum install -y unzip zip 命令

3、进入到logstash bin目录

cd logstash-6.4.0/bin

4、安装logstash-jdbc

./logstash-plugin install logstash-input-jdbc

执行本步骤时,需要java虚拟环境,如 未安装java虚拟环境,请参考以下链接安装

centos7安装配置java环境变量


5、编写配置文件(jdbc.sql和jdbc.conf,建议在bin目录下vim jdbc.conf)


mysql 驱动包下载地址:https://downloads.mysql.com/archives/c-j/


1)jdbc.conf 文件

input {
    stdin {
    }
    jdbc {
      # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
      jdbc_connection_string => "jdbc:mysql://mysql ip:3306/数据库名?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
      # 你的账户密码
      jdbc_user => "mysql 用户名"
      jdbc_password => "mysql 密码"
      # 连接数据库的驱动包,建议使用绝对地址
      jdbc_driver_library => "/root/soft/logstash-6.4.0/bin/mysql/mysql-connector-java-5.1.45-bin.jar"
      # 这是不用动就好
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
 
      #处理中文乱码问题
      codec => plain { charset => "UTF-8"}
 
      #使用其它字段追踪,而不是用时间
      use_column_value => true
      #追踪的字段      
      tracking_column => LogicID      
      record_last_run => true     
      #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值     
      last_run_metadata_path => "/root/soft/logstash-6.4.0/bin/mysql/station_parameter.txt"
 
      jdbc_default_timezone => "Asia/Shanghai"

      lowercase_column_names => "false"
 
      statement_filepath => "/root/soft/logstash-6.4.0/bin/mysql/jdbc.sql"
      
 
      #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
 
      # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟
      schedule => "15 23 * * *"
      type => "jdbc"
    }
}
 
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
 
output {
    elasticsearch {
        # 要导入到的Elasticsearch所在的主机
        hosts => "10.10.10.10:9200"
        # 要导入到的Elasticsearch的索引的名称
        index => "product_index1"
        # 类型名称(类似数据库表名)
        document_type => "product"
        # 主键名称(类似数据库主键)
        document_id => "%{logicid}"
        # es 账号
        user => "es用户名"
        password => "es 密码"
        
    }
 
    stdout {
        # JSON格式输出
        codec => json_lines
    }
    
}

2) jdbc.sql

注:mysql 中的大写字段转到es中会变小写

SELECT LogicID,
Title,
UserID,
ViewCount,
Status,
Url,
WebsiteID,
Price,
CategoryID1,
CategoryName1,
CategoryID2,
CategoryName2,
CategoryID3,
CategoryName3,
CategoryID4,
GoodReviewRate,
ReviewQty,
IsDeleted,
GoodReviewQty,
BrandID,
BrandName,
PriceUpdatedDate,
PagePrice,
ImageUrls,
ImageUrl,
ShopID,
ShopName,
UpdatedDate,
EnteredDate
FROM
product1

3)station_parameter.txt 文件内容为从哪一条数据,开始写es

A10555137


6、执行logstash 脚本

./logstash -f mysql/jdbc.conf


nohup 方式:nohup ./logstash -f mysql/jdbc.conf > log3.log 2>&1 &

本文永久性链接:
<a href="http://r4.com.cn/art186.aspx">Centos下 用Logstash同步mysql数据到Es</a>
当前header:Host: r4.com.cn X-Host1: r4.com.cn X-Host2: r4.com.cn X-Host3: 127.0.0.1:8080 X-Forwarded-For: 34.239.185.22 X-Real-Ip: 34.239.185.22 X-Domain: r4.com.cn X-Request: GET /art186.aspx HTTP/1.1 X-Request-Uri: /art186.aspx Connection: close Accept: */* User-Agent: claudebot