Created
July 16, 2020 06:14
-
-
Save ohoroyoi/49aeca68efbc242704b93969be6dcbcc to your computer and use it in GitHub Desktop.
logstash : input 3 pg table, output 4 es indexes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
input { | |
jdbc { | |
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8" | |
jdbc_user => "atlas" | |
jdbc_passwor => "atlaspw" | |
jdbc_validate_connection => true | |
jdbc_driver_library => "DRIVER_PATH" | |
jdbc_driver_class => "org.postgresql.Driver" | |
schedule => "* * * * *" | |
statement => "SELECT region_id, region_type, country_code, country_code3, continent_code, source_from, | |
st_asgeojson(center_geo_point)::text as center_geo_point_text, center_longitude, center_latitude, jsonn::text | |
from expedia_region_union where source_timestamp + interval '9 hours' > :sql_last_value" | |
tracking_column => "source_timestamp" | |
tracking_column_type => "timestamp" | |
use_column_value => true | |
clean_run => false | |
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run" | |
tags => ["expedia_region_union"] | |
} | |
jdbc { | |
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8" | |
jdbc_user => "atlas" | |
jdbc_passwor => "atlaspw" | |
jdbc_validate_connection => true | |
jdbc_driver_library => "DRIVER_PATH" | |
jdbc_driver_class => "org.postgresql.Driver" | |
schedule => "*/10 * * * *" | |
statement => "select id, iata_airport_code, name, name_full, country_code, center_latitude, center_longitude, name_kr as name_korean, name_full_kr as name_korean_full, | |
st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from, | |
source_time, iata_airport_metro_code, (select json_build_object('region_id', region_id, 'region_name', region_name, | |
'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c" | |
jdbc_paging_enabled => true | |
jdbc_page_size => "100000" | |
tags => ["expedia_airport_more"] | |
} | |
jdbc { | |
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8" | |
jdbc_user => "atlas" | |
jdbc_passwor => "atlaspw" | |
jdbc_validate_connection => true | |
jdbc_driver_library => "DRIVER_PATH" | |
jdbc_driver_class => "org.postgresql.Driver" | |
schedule => "*/10 * * * *" | |
statement => "select source_from, region_name_kr as region_name_korean, region_name_full_kr as region_name_korean_full, | |
region_id, region_name_Full, continent_code, region_type, descendants::text as descendants_text from expedia_region_continent"} | |
tags => ["expedia_region_continent"] | |
} | |
stdin { codec => plain { charset => "UTF-8" }} | |
filter { | |
if "expedia_region_union" in [tags] { | |
ruby { | |
code => " | |
require 'json' | |
begin | |
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}') | |
event.set('point', center_geo_point_json) | |
event.remove('center_geo_point_text') | |
rescue Exception => e | |
# event.tag('malfunctioned center_geo_point') | |
end | |
begin | |
location = { lat: event.get('center_latitude').to_f, lon: event.get('center_longitude').to_f} | |
event.set('location', JSON(location.to_json)) | |
event.set('center_longitude', event.get('center_longitude')) | |
event.set('center_latitude', event.get('center_latitude')) | |
rescue Exception => e | |
# event.tag('something happens in location block ') | |
end | |
begin | |
jsonn_json = JSON.parse(event.get('jsonn').to_s || '{}') | |
jsonn_json.each {|k,v| | |
event.set(k,v) | |
} | |
event.remove('jsonn') | |
rescue Exception => e | |
# event.tag('something happens in jsonn') | |
end | |
" | |
} | |
date { | |
match => ['time', 'UNIX'] | |
} | |
clone { | |
clones => ['test-union-007', ] | |
} | |
} | |
} | |
output {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment