OpenSearch Logstash Pipeline development
In part 2 of this series we take a look at how the logstash filter is created. The filter in logstash is responsible for extracting the various components fields and adding further information to each entry before passing it on to the OpenSearch engine.
Now that you have the basic data arriving from the firewall computer it is necessary to customize extraction and enhancement. Even though the firewall at ElectricBrain is Ubuntu 20.04 running firewalld on top of iptables the developmnet steps outlined here can be used to adapt the filter to pretty much any firewall output.
The objective at this stage is to accept data at the input of the OpenSearch Logstash pipeline and then transform it into JSON with appropriate fields filled in such that OpenSearch can ingest and index the data. In this case "appropriate fields" means an index mapping OpenSearch can use to store and index the inbound data. Setting up the index mapping is accomplished via an index template when the index is created.
The creation of the mapping is discussed later, for now the extraction of fields can be (and was) setup without regard for the final design of the mapping in the backend search engine. Ultimately it is in the best interest of everyone to keep the design of the mapping fields consistent with the Elastic Open Schema otherwise known by the acromyn ECS. "The Elastic Common Schema (ECS) defines a common set of fields for ingesting data into Elasticsearch. A common schema helps you correlate data from sources like logs and metrics or IT operations analytics and security analytics."
The development process used at ElectricBrain to come up with pipeline filters was essentially to run Logstash over and over from the commandline (via docker) making adjustments to the filter syntax.
docker run \ -it \ --rm \ --name logstash \ --network host \ registry:5000/opensearchproject/logstash-oss-with-opensearch-output-plugin:7.16.2 -e ' # Format to a single line using: # https://string.is/yaml-formatter # Once converted remove the extra "data:" and add the appropriate output clause # data: ' input { udp { port => 6666 } } filter { grok { match => { "message" => "\[%{SPACE}(%{NUMBER:timestamplog})?\]%{SPACE}(%{WORD:event_src})?\:%{SPACE}(%{GREEDYDATA:event_tail})?" } } if [event_src] != "FINAL_REJECT" { drop {} } grok { match => { "event_tail" => "IN=(%{USERNAME:in_interface})?%{SPACE}OUT=(%{USERNAME:out_interface})?%{SPACE}MAC=(%{COMMONMAC:[destination][mac]}\:%{COMMONMAC:[source][mac]}\:..\:..)?(%{GREEDYDATA:event_tail})?" } overwrite => [ "event_tail" ] } if [in_interface] != "ppp0" { drop {} } grok { match => { "event_tail" => "(%{SPACE})?(%{WORD:junk})?%{GREEDYDATA}" } } if [junk] != "TRUNCATED" { grok { match => { "event_tail" => "(%{SPACE})?SRC=%{IPV4:[source][address]}%{SPACE}DST=%{IPV4:[destination][address]}%{SPACE}LEN=%{INT:[destination][bytes]}%{SPACE}TOS=%{BASE16NUM:[packet][tos]}%{SPACE}PREC=%{BASE16NUM:[packet][prec]}%{SPACE}TTL=%{INT:[packet][ttl]}%{SPACE}ID=%{INT:[packet][id]}%{GREEDYDATA:event_tail}" } overwrite => [ "event_tail" ] } mutate { replace => {"junk" => "%{[source][address]}"} add_field => {"[destination][ip]" => "%{[destination][address]}"} convert => {"[packet][tos]" => "integer" } convert => {"[packet][prec]" => "integer" } } anonymize { algorithm => "MD5" fields => [ "junk" ] key => "electricbrain" } grok { match => { "junk" => "(?[0-9a-f]{4})(? [0-9a-f]{4})(? [0-9a-f]{4})(? [0-9a-f]{4})(? [0-9a-f]{4})(? [0-9a-f]{4})(? [0-9a-f]{4})(? [0-9a-f]{4})%{GREEDYDATA:junk1}" } } mutate { add_field => { "[source][ip]" => "%{tmp0}:%{tmp1}:%{tmp2}:%{tmp3}:%{tmp4}:%{tmp5}:%{tmp6}:%{tmp7}" } remove_field => [ "tmp0", "tmp1", "tmp2", "tmp3", "tmp4", "tmp5", "tmp6", "tmp7" ] } grok { match => { "event_tail" => "(%{SPACE})?(%{WORD:junk})?%{GREEDYDATA}" } overwrite => [ "junk" ] } if [junk] == "CE" or [junk] == "DF" or [junk] == "MF" { grok { match => { "event_tail" => "(%{SPACE})?%{WORD:[packet][flags]}%{GREEDYDATA:event_tail}" } overwrite => [ "event_tail" ] } } grok { match => { "event_tail" => "(%{SPACE})?(FRAG:%{INT:[packet][frag]})?%{GREEDYDATA:event_tail}" } overwrite => [ "event_tail" ] } grok { match => { "event_tail" => "(%{SPACE})?(%{WORD:junk})?%{GREEDYDATA}" } overwrite => [ "junk" ] } if [junk] != "TRUNCATED" { grok { match => { "event_tail" => "(%{SPACE}OPT \(%{BASE16NUM:[packet][options]}\))?%{SPACE}PROTO=%{WORD:[packet][protocol]}%{GREEDYDATA:event_tail}" } overwrite => [ "event_tail" ] } if [packet][protocol] == "TCP" { grok { match => { "event_tail" => "(%{SPACE})?(INCOMPLETE \[(%{SPACE})?%{INT:[packet][tcp_incomplete_bytes]} bytes\])?SPT=%{INT:[source][port]} DPT=%{INT:[destination][port]}(SEQ=%{INT:[packet][tcp_seq]} ACK=%{INT:[packet][tcp_ack]})?%{SPACE}WINDOW=%{INT:[packet][tcp_window]}%{SPACE}RES=%{BASE16NUM:[packet][tcp_res]}(%{SPACE}%{GREEDYDATA:[packet][tcp_state]})?%{SPACE}URGP=%{INT:[packet][tcp_urgp]}(%{SPACE}OPT \(%{WORD:[packet][tcp_options]}\))?%{GREEDYDATA:event_tail}" } overwrite => [ "event_tail" ] } mutate { convert => { "[packet][tcp_res]" => "integer" } } } if [packet.protocol] == "UDP" { grok { match => { "event_tail" => "(%{SPACE})?(INCOMPLETE \[(%{SPACE})?%{INT:[packet][udp_incomplete_bytes]} bytes\])?(SPT=%{INT:[source][port]} DPT=%{INT:[destination][port]} LEN=%{INT:[packet][udp_len]})?%{GREEDYDATA:event_tail}" } overwrite => [ "event_tail" ] } } mutate { replace => { "junk" => "%{[source][address]}" } } geoip { source => "junk" target => "[source]" fields => [ "city_name", "continent_name", "country_code2", "country_name", "location", "region_code", "region_name" ] } mutate { rename => [ "[source][geo][country_code2]", "[source][geo][country_iso_code]" ] rename => [ "[source][geo][region_code]", "[source][geo][region_iso_code]" ] remove_field => [ "[source][geo][ip]", "[source][geo][latitude]", "[source][geo][longitude]" ] } } } mutate { rename => ["host", "hostname"] convert => {"hostname" => "string"} remove_field => "[junk]" } } output { opensearch { hosts => ["https://opensearch-node1:9200"] index => "ebrain_ids-0.1.2-%{+YYYY.MM.dd}" user => "userid" password => "xxxxxxxxxxxxx" ssl => true ssl_certificate_verification => false } }' # output { # stdout { } # }
Netconsole to ship kernel log information
Using netconsole to ship kernel logs to logstash requires the netconsole package to be installed. The systemd configuration is shown below.
[Unit] Description=Dynamically configure Linux netconsole Documentation=man:netconsole-setup(8) After=network-online.target Wants=network-online.target RequiresMountsFor=/usr [Service] Type=oneshot RemainAfterExit=yes EnvironmentFile=-/etc/default/netconsole ExecStart=/usr/sbin/netconsole-setup $NETCONSOLE_OPTS ExecReload=/usr/sbin/netconsole-setup $NETCONSOLE_OPTS [Install] WantedBy=multi-user.target
The configuration file is located in /etc/default/netconsole.
# Default settings for netconsole service # This is a POSIX shell fragment # Additional options that are passed to the netconsole-setup script NETCONSOLE_OPTS="MyHost"