input {
jdbc {
# MySQ驱动配置
jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
#是否进行分页
#jdbc_paging_enabled => "false"
#jdbc_page_size => "5000"
#增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
tracking_column => "vr_id"
tracking_column_type => "numeric"
#这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\textrepair.txt"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT * FROM table_name WHERE vr_id > :sql_last_value"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
# "*/5 * * * * *" 5秒钟执行一次
schedule => "*/5 * * * * *"
}
jdbc {
jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
#是否进行分页
#jdbc_paging_enabled => "false"
#jdbc_page_size => "5000"
#增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
tracking_column => "vr_id"
tracking_column_type => "numeric"
#这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\videorepair.txt"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT * FROM table_name WHERE vr_id > :sql_last_value"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "*/5 * * * * *"
}
jdbc {
#如果配置多个数据源需要用type来区分
type => "schematic_diagram"
jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
#是否进行分页
#jdbc_paging_enabled => "false"
#jdbc_page_size => "5000"
#增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
tracking_column => "product_id"
tracking_column_type => "numeric"
#这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\schematic_diagram.txt"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT * FROM table_name WHERE product_id > :sql_last_value"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "*/5 * * * * *"
}
jdbc {
#如果配置多个数据源需要用type来区分
type => "program"
jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
#是否进行分页
#jdbc_paging_enabled => "false"
#jdbc_page_size => "5000"
#增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
tracking_column => "program_id"
tracking_column_type => "numeric"
#这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\program.txt"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT * FROM table_name WHERE program_id> :sql_last_value"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "*/5 * * * * *"
}
}
output {
if[type]=="text"{
elasticsearch {
document_id => "%{vr_id}"
index => "knowledge_textrepaires"
hosts => ["localhost:9200"]
template_overwrite => true
}
}
if[type]=="video"{
elasticsearch {
document_id => "%{vr_id}"
index => "knowledge_videorepaires"
hosts => ["localhost:9200"]
template_overwrite => true
}
}
if[type]=="schematic_diagram"{
elasticsearch {
document_id => "%{product_unique_code}"
index => "knowledge_schematicdiagrames"
hosts => ["localhost:9200"]
template_overwrite => true
}
}
if[type]=="program"{
elasticsearch {
document_id => "%{program_id}"
index => "knowledge_programes"
hosts => ["localhost:9200"]
template_overwrite => true
}
}
# 这里输出调试,正式运行时可以注释掉
stdout{
codec => rubydebug
}
}
注意事项
- 如果SQL查询结果中存在type字段output中根据type控制输出位置使用的是查询返回的type,此时配置文件中的type无效,展现出来的效果会误认为通过if控制输出位置无效
运行环境
- logstash 7.14.0
- Elasticsearch 7.14.0
- MySQL 5.7