logstash 配置jdbc数据源 output 到 Elasticsearch

文章目录[隐藏]

input {
  jdbc {
    # MySQ驱动配置
    jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => root
    #是否进行分页
    #jdbc_paging_enabled => "false"
    #jdbc_page_size => "5000"
    #增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
    tracking_column => "vr_id"
    tracking_column_type => "numeric"
    #这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
    last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\textrepair.txt"
    use_column_value => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT * FROM table_name WHERE vr_id > :sql_last_value"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    # "*/5 * * * * *"  5秒钟执行一次
    schedule => "*/5 * * * * *"
  }
  
  jdbc {
    jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => root
    #是否进行分页
    #jdbc_paging_enabled => "false"
    #jdbc_page_size => "5000"
    #增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
    tracking_column => "vr_id"
    tracking_column_type => "numeric"
    #这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
    last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\videorepair.txt"
    use_column_value => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT * FROM table_name WHERE vr_id > :sql_last_value"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => "*/5 * * * * *"
  }
  
  jdbc {
    #如果配置多个数据源需要用type来区分
    type => "schematic_diagram"
    jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => root
    #是否进行分页
    #jdbc_paging_enabled => "false"
    #jdbc_page_size => "5000"
    #增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
    tracking_column => "product_id"
    tracking_column_type => "numeric"
    #这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
    last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\schematic_diagram.txt"
    use_column_value => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT * FROM table_name WHERE product_id > :sql_last_value"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => "*/5 * * * * *"
  }
  
  jdbc {
    #如果配置多个数据源需要用type来区分
    type => "program"
    jdbc_driver_library => "D:\apps\logstash-7.14.0\config\mysql-connector-java-5.1.42.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/db_name?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => root
    #是否进行分页
    #jdbc_paging_enabled => "false"
    #jdbc_page_size => "5000"
    #增量查询参考字段(SQL中 :sql_last_value 变量会替换为该值)
    tracking_column => "program_id"
    tracking_column_type => "numeric"
    #这个文件需要自己创建出来 作用就是记录tracking_column的值(重启不会重置) 下次执行同步数据库的操作的时候直接从这个标识查起,文件名随意起
    last_run_metadata_path => "D:\apps\logstash-7.14.0\config\last_run_metadata\program.txt"
    use_column_value => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT * FROM table_name WHERE program_id> :sql_last_value"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => "*/5 * * * * *"
  }
}

output {
  if[type]=="text"{
    elasticsearch {
      document_id => "%{vr_id}"
      index => "knowledge_textrepaires"
      hosts => ["localhost:9200"]
      template_overwrite => true
    }
  }
  
  if[type]=="video"{
    elasticsearch {
      document_id => "%{vr_id}"
      index => "knowledge_videorepaires"
      hosts => ["localhost:9200"]
      template_overwrite => true
    }
  }
  
  if[type]=="schematic_diagram"{
    elasticsearch {
      document_id => "%{product_unique_code}"
      index => "knowledge_schematicdiagrames"
      hosts => ["localhost:9200"]
      template_overwrite => true
    }
  }
  
  if[type]=="program"{
    elasticsearch {
      document_id => "%{program_id}"
      index => "knowledge_programes"
      hosts => ["localhost:9200"]
      template_overwrite => true
    }
  }
  
  # 这里输出调试,正式运行时可以注释掉
  stdout{
    codec => rubydebug
  }
}

注意事项

  1. 如果SQL查询结果中存在type字段output中根据type控制输出位置使用的是查询返回的type,此时配置文件中的type无效,展现出来的效果会误认为通过if控制输出位置无效

运行环境

  • logstash 7.14.0
  • Elasticsearch 7.14.0
  • MySQL 5.7

发表评论