From 91b0cc43dd66b489660bf9ad0e4124186393da94 Mon Sep 17 00:00:00 2001 From: Karen Metts <35154725+karenzone@users.noreply.github.com> Date: Wed, 3 Sep 2025 12:47:19 -0400 Subject: [PATCH] Doc: Refine geoip filter example (#18101) Added sample output structure for geoip filter Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> (cherry picked from commit 86ee4df168c1c9094f4a9e816fbbe9354f546992) # Conflicts: # docs/static/transforming-data.asciidoc --- docs/static/transforming-data.asciidoc | 657 +++++++++++++++++++++++++ 1 file changed, 657 insertions(+) create mode 100644 docs/static/transforming-data.asciidoc diff --git a/docs/static/transforming-data.asciidoc b/docs/static/transforming-data.asciidoc new file mode 100644 index 0000000000..ddd9a298d0 --- /dev/null +++ b/docs/static/transforming-data.asciidoc @@ -0,0 +1,657 @@ +[[transformation]] +== Transforming Data + +With over 200 plugins in the Logstash plugin ecosystem, it's sometimes +challenging to choose the best plugin to meet your data processing needs. +In this section, we've collected a list of popular plugins and organized them +according to their processing capabilities: + +* <> +* <> +* <> +* <> + +Also see <> and <> for the full list of available +data processing plugins. + +[[core-operations]] +=== Performing Core Operations + +The plugins described in this section are useful for core operations, such as +mutating and dropping events. + +<>:: + +Parses dates from fields to use as Logstash timestamps for events. ++ +The following config parses a field called `logdate` to set the Logstash +timestamp: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + date { + match => [ "logdate", "MMM dd yyyy HH:mm:ss" ] + } +} +-------------------------------------------------------------------------------- + + +<>:: + +Drops events. This filter is typically used in combination with conditionals. ++ +The following config drops `debug` level log messages: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + if [loglevel] == "debug" { + drop { } + } +} +-------------------------------------------------------------------------------- + + +<>:: + +Fingerprints fields by applying a consistent hash. ++ +The following config fingerprints the `IP`, `@timestamp`, and `message` fields +and adds the hash to a metadata field called `generated_id`: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + fingerprint { + source => ["IP", "@timestamp", "message"] + method => "SHA1" + key => "0123" + target => "[@metadata][generated_id]" + } +} +-------------------------------------------------------------------------------- + + +<>:: + +Performs general mutations on fields. You can rename, remove, replace, and +modify fields in your events. ++ +The following config renames the `HOSTORIP` field to `client_ip`: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + mutate { + rename => { "HOSTORIP" => "client_ip" } + } +} +-------------------------------------------------------------------------------- ++ +The following config strips leading and trailing whitespace from the specified +fields: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + mutate { + strip => ["field1", "field2"] + } +} +-------------------------------------------------------------------------------- + + +<>:: + +Executes Ruby code. ++ +The following config executes Ruby code that cancels 90% of the events: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + ruby { + code => "event.cancel if rand <= 0.90" + } +} +-------------------------------------------------------------------------------- + + +[[data-deserialization]] +=== Deserializing Data + +The plugins described in this section are useful for deserializing data into +Logstash events. + +<>:: + +Reads serialized Avro records as Logstash events. This plugin deserializes +individual Avro records. It is not for reading Avro files. Avro files have a +unique format that must be handled upon input. ++ +The following config deserializes input from Kafka: ++ +[source,json] +---------------------------------- +input { + kafka { + codec => { + avro => { + schema_uri => "/tmp/schema.avsc" + } + } + } +} +... +---------------------------------- + + +<>:: + +Parses comma-separated value data into individual fields. By default, the +filter autogenerates field names (column1, column2, and so on), or you can specify +a list of names. You can also change the column separator. ++ +The following config parses CSV data into the field names specified in the +`columns` field: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + csv { + separator => "," + columns => [ "Transaction Number", "Date", "Description", "Amount Debit", "Amount Credit", "Balance" ] + } +} +-------------------------------------------------------------------------------- + +<>:: + +Reads the Fluentd `msgpack` schema. ++ +The following config decodes logs received from `fluent-logger-ruby`: ++ +[source,json] +-------------------------------------------------------------------------------- +input { + tcp { + codec => fluent + port => 4000 + } +} +-------------------------------------------------------------------------------- + + +<>:: + +Decodes (via inputs) and encodes (via outputs) JSON formatted content, creating +one event per element in a JSON array. ++ +The following config decodes the JSON formatted content in a file: ++ +[source,json] +-------------------------------------------------------------------------------- +input { + file { + path => "/path/to/myfile.json" + codec =>"json" +} +-------------------------------------------------------------------------------- + + +<>:: + +Reads protobuf encoded messages and converts them to Logstash events. Requires +the protobuf definitions to be compiled as Ruby files. You can compile them by +using the +https://github.com/codekitchen/ruby-protocol-buffers[ruby-protoc compiler]. ++ +The following config decodes events from a Kafka stream: ++ +[source,json] +-------------------------------------------------------------------------------- +input + kafka { + zk_connect => "127.0.0.1" + topic_id => "your_topic_goes_here" + codec => protobuf { + class_name => "Animal::Unicorn" + include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] + } + } +} +-------------------------------------------------------------------------------- + + +<>:: + +Parses XML into fields. ++ +The following config parses the whole XML document stored in the `message` field: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + xml { + source => "message" + } +} +-------------------------------------------------------------------------------- + + +[[field-extraction]] +=== Extracting Fields and Wrangling Data + +The plugins described in this section are useful for extracting fields and +parsing unstructured data into fields. + +<>:: + +Extracts unstructured event data into fields by using delimiters. The dissect +filter does not use regular expressions and is very fast. However, if the +structure of the data varies from line to line, the grok filter is more +suitable. ++ +For example, let's say you have a log that contains the following message: ++ +[source,json] +-------------------------------------------------------------------------------- +Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool... +-------------------------------------------------------------------------------- ++ +The following config dissects the message: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + dissect { + mapping => { "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}" } + } +} +-------------------------------------------------------------------------------- ++ +After the dissect filter is applied, the event will be dissected into the following +fields: ++ +[source,json] +-------------------------------------------------------------------------------- +{ + "msg" => "Starting system activity accounting tool...", + "@timestamp" => 2017-04-26T19:33:39.257Z, + "src" => "localhost", + "@version" => "1", + "host" => "localhost.localdomain", + "pid" => "1", + "message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...", + "type" => "stdin", + "prog" => "systemd", + "ts" => "Apr 26 12:20:02" +} +-------------------------------------------------------------------------------- + +<>:: + +Parses key-value pairs. ++ +For example, let's say you have a log message that contains the following +key-value pairs: ++ +[source,json] +-------------------------------------------------------------------------------- +ip=1.2.3.4 error=REFUSED +-------------------------------------------------------------------------------- ++ +The following config parses the key-value pairs into fields: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + kv { } +} +-------------------------------------------------------------------------------- ++ +After the filter is applied, the event in the example will have these fields: ++ +* `ip: 1.2.3.4` +* `error: REFUSED` + + +<>:: + +Parses unstructured event data into fields. This tool is perfect for syslog +logs, Apache and other webserver logs, MySQL logs, and in general, any log +format that is generally written for humans and not computer consumption. +Grok works by combining text patterns into something that matches your +logs. ++ +For example, let's say you have an HTTP request log that contains +the following message: ++ +[source,json] +-------------------------------------------------------------------------------- +55.3.244.1 GET /index.html 15824 0.043 +-------------------------------------------------------------------------------- ++ +The following config parses the message into fields: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + grok { + match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } + } +} +-------------------------------------------------------------------------------- ++ +After the filter is applied, the event in the example will have these fields: ++ +* `client: 55.3.244.1` +* `method: GET` +* `request: /index.html` +* `bytes: 15824` +* `duration: 0.043` + +TIP: If you need help building grok patterns, try the +{kibana-ref}/xpack-grokdebugger.html[Grok Debugger]. The Grok Debugger is an +{xpack} feature under the Basic License and is therefore *free to use*. + + +[[lookup-enrichment]] +=== Enriching Data with Lookups + +These plugins can help you enrich data with +additional info, such as GeoIP and user agent info: + +* <> +* <> +* <> +* <> +* <> +* <> +* <> +* <> +* <> + +[float] +[[lookup-plugins]] +=== Lookup plugins + +[[dns-def]]dns filter:: + +The <> performs a standard or reverse DNS lookup. ++ +The following config performs a reverse lookup on the address in the +`source_host` field and replaces it with the domain name: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + dns { + reverse => [ "source_host" ] + action => "replace" + } +} +-------------------------------------------------------------------------------- + +[[es-def]]elasticsearch filter:: + +The <> copies fields from previous log events in Elasticsearch to current events. ++ +The following config shows a complete example of how this filter might +be used. Whenever Logstash receives an "end" event, it uses this Elasticsearch +filter to find the matching "start" event based on some operation identifier. +Then it copies the `@timestamp` field from the "start" event into a new field on +the "end" event. Finally, using a combination of the date filter and the +ruby filter, the code in the example calculates the time duration in hours +between the two events. ++ +[source,json] +-------------------------------------------------- + if [type] == "end" { + elasticsearch { + hosts => ["es-server"] + query => "type:start AND operation:%{[opid]}" + fields => { "@timestamp" => "started" } + } + date { + match => ["[started]", "ISO8601"] + target => "[started]" + } + ruby { + code => 'event.set("duration_hrs", (event.get("@timestamp") - event.get("started")) / 3600) rescue nil' + } + } +-------------------------------------------------- + +[[geoip-def]]geoip filter:: + +The <> adds geographical information about the location of IP addresses. For example: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + geoip { + source => "clientip" + target => "[source]" + } +} +-------------------------------------------------------------------------------- ++ +After the geoip filter is applied, the event is enriched with geoip fields. +For example: ++ +[source,json] +-------------------------------------------------------------------------------- +"source" => { + "geo" => { + "country_name" => "France", + "country_iso_code" => "FR", + "continent_code" => "EU", + "location" => { + "lon" => 2.3387, + "lat" => 48.8582 + }, + "timezone" => "Europe/Paris" + }, + "ip" => "82.67.74.30" +} +-------------------------------------------------------------------------------- + +[[http-def]]http filter:: + +The <> integrates with external web +services/REST APIs, and enables lookup enrichment against any HTTP service or +endpoint. This plugin is well suited for many enrichment use cases, such as +social APIs, sentiment APIs, security feed APIs, and business service APIs. +//+ +//[source,txt] +//----- +//filter { +// http { +// url => "http://example.com" +// verb => GET +// body => { +// "user-id" => "%{user}" +// "api-key" => "%{api_key}" +// } +// body_format => "json" +// headers => +// "Content-type" => "application/json" +// } +// target_body => "new_field" +// } +//} +//----- + +[[jdbc-static-def]]jdbc_static filter:: + +The <> enriches events with data pre-loaded from a remote database. ++ +The following example fetches data from a remote database, caches it in a local +database, and uses lookups to enrich events with data cached in the local +database. ++ +["source","json",subs="callouts"] +----- +filter { + jdbc_static { + loaders => [ <1> + { + id => "remote-servers" + query => "select ip, descr from ref.local_ips order by ip" + local_table => "servers" + }, + { + id => "remote-users" + query => "select firstname, lastname, userid from ref.local_users order by userid" + local_table => "users" + } + ] + local_db_objects => [ <2> + { + name => "servers" + index_columns => ["ip"] + columns => [ + ["ip", "varchar(15)"], + ["descr", "varchar(255)"] + ] + }, + { + name => "users" + index_columns => ["userid"] + columns => [ + ["firstname", "varchar(255)"], + ["lastname", "varchar(255)"], + ["userid", "int"] + ] + } + ] + local_lookups => [ <3> + { + id => "local-servers" + query => "select descr as description from servers WHERE ip = :ip" + parameters => {ip => "[from_ip]"} + target => "server" + }, + { + id => "local-users" + query => "select firstname, lastname from users WHERE userid = :id" + parameters => {id => "[loggedin_userid]"} + target => "user" <4> + } + ] + # using add_field here to add & rename values to the event root + add_field => { server_name => "%{[server][0][description]}" } + add_field => { user_firstname => "%{[user][0][firstname]}" } <5> + add_field => { user_lastname => "%{[user][0][lastname]}" } + remove_field => ["server", "user"] + jdbc_user => "logstash" + jdbc_password => "example" + jdbc_driver_class => "org.postgresql.Driver" + jdbc_driver_library => "/tmp/logstash/vendor/postgresql-42.1.4.jar" + jdbc_connection_string => "jdbc:postgresql://remotedb:5432/ls_test_2" + } +} +----- +<1> Queries an external database to fetch the dataset that will be cached +locally. +<2> Defines the columns, types, and indexes used to build the local database +structure. The column names and types should match the external database. +<3> Performs lookup queries on the local database to enrich the events. +<4> Specifies the event field that will store the looked-up data. If the lookup +returns multiple columns, the data is stored as a JSON object within the field. +<5> Takes data from the JSON object and stores it in top-level event fields for +easier analysis in Kibana. + +[[jdbc-stream-def]]jdbc_streaming filter:: + +The <> enriches events with database data. ++ +The following example executes a SQL query and stores the result set in a field +called `country_details`: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + jdbc_streaming { + jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar" + jdbc_driver_class => "com.mysql.jdbc.Driver" + jdbc_connection_string => "jdbc:mysql://localhost:3306/mydatabase" + jdbc_user => "me" + jdbc_password => "secret" + statement => "select * from WORLD.COUNTRY WHERE Code = :code" + parameters => { "code" => "country_code"} + target => "country_details" + } +} +-------------------------------------------------------------------------------- + +[[memcached-def]]memcached filter:: + +The <> enables key/value lookup +enrichment against a Memcached object caching system. +It supports both read (GET) and write (SET) operations. It is a notable addition +for security analytics use cases. + +[[translate-def]]translate filter:: + +The <> replaces field contents based on replacement values specified in a hash or file. +Currently supports these file types: YAML, JSON, and CSV. ++ +The following example takes the value of the `response_code` field, translates +it to a description based on the values specified in the dictionary, and then +removes the `response_code` field from the event: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + translate { + field => "response_code" + destination => "http_response" + dictionary => { + "200" => "OK" + "403" => "Forbidden" + "404" => "Not Found" + "408" => "Request Timeout" + } + remove_field => "response_code" + } +} +-------------------------------------------------------------------------------- + +[[useragent-def]]useragent filter:: + +The <> parses user agent strings into fields. ++ +The following example takes the user agent string in the `agent` field, parses +it into user agent fields, and adds the user agent fields to a new field called +`user_agent`. It also removes the original `agent` field: ++ +[source,json] +-------------------------------------------------------------------------------- +filter { + useragent { + source => "agent" + target => "user_agent" + remove_field => "agent" + } +} +-------------------------------------------------------------------------------- ++ +After the filter is applied, the event will be enriched with user agent fields. +For example: ++ +[source,json] +-------------------------------------------------------------------------------- + "user_agent": { + "os": "Mac OS X 10.12", + "major": "50", + "minor": "0", + "os_minor": "12", + "os_major": "10", + "name": "Firefox", + "os_name": "Mac OS X", + "device": "Other" + } +--------------------------------------------------------------------------------