Thanks Jochen, see the information below. as I said trying to search on “destPort:8010” fails.
NOTE that I use two static fields in the logstash.conf.
"short_message" => "hass test short_message"
"host" => "testserver"
This was to remove warnings in Graylog related to host and also an error when it was processing some messages.
Example of how the messages section looks in Graylog:
Timestamp |
destIp |
destPort |
srcIp |
srcPort |
time |
2018-02-18 15:37:30.499 |
172.16.1.4 |
8010 |
13.129.107.213 |
35270 |
2018-02-18T15:36:15.4721718Z |
2018-02-18 15:37:30.499 |
172.16.1.4 |
8008 |
13.129.107.213 |
47508 |
2018-02-18T15:36:15.4721718Z |
2018-02-18 15:37:30.499 |
172.16.1.4 |
8010 |
13.129.107.213 |
35276 |
2018-02-18T15:36:15.4721718Z |
2018-02-18 15:37:30.499 |
172.16.1.4 |
8010 |
13.129.107.213 |
35277 |
2018-02-18T15:36:15.4721718Z |
2018-02-18 15:37:30.499 |
172.16.1.4 |
8010 |
13.129.107.213 |
35253 |
2018-02-18T15:36:15.4721718Z |
Example of an expanded message/record:
Received by
NSG Flow Logs US EAST on 721931dd / USA1042LV000905
Stored in index
graylog_0
Routed into streams
All messages
@timestamp
2018-02-18T15:36:00.000Z
@version
1
NetworkSecurityGroup
USA1039-NSG-NORTHCENTRALUS-BASE
ResourceGroup
USA1039-VDC-NORTHCENTRALUS
Subscription
AZURE-SUBSCRIPTION-ID-REMOVED-HASS
Version
1
category
NetworkSecurityGroupFlowEvent
destIp
172.16.1.4
destPort
8010
mac
000D3A609738
message
hass test short_message
operationName
NetworkSecurityGroupFlowEvents
protocol
T
records
{"resourceId":["","SUBSCRIPTIONS","AZURE-SUBSCRIPTION-ID-REMOVED-HASS)","RESOURCEGROUPS","USA1039-VDC-NORTHCENTRALUS","PROVIDERS","MICROSOFT.NETWORK","NETWORKSECURITYGROUPS","USA1039-NSG-NORTHCENTRALUS-BASE"],"operationName":"NetworkSecurityGroupFlowEvents","systemId":"AZURE-SYSTEM-ID-REMOVED-HASS","time":"2018-02-18T15:36:15.4721718Z","properties":{"Version":1,"flows":{"rule":"DefaultRule_AllowVnetInBound","flows":{"flowTuples":["1518968160","13.129.107.213","172.16.1.4","35270","8010","T","I","A"],"mac":"000D3A609738"}}},"category":"NetworkSecurityGroupFlowEvent"}
rule
DefaultRule_AllowVnetInBound
source
testserver
srcIp
13.129.107.213
srcPort
35270
systemId
AZURE-SYSTEM-ID-REMOVED-HASS
time
2018-02-18T15:36:15.4721718Z
timestamp
2018-02-18T15:37:30.499Z
traffic
A
trafficflow
I
unixtimestamp
1518968160
Logstash.conf example:
input {
azureblob
{
storage_account_name => "STORAGE-ACCOUNT-NAME-REMOVED-HASS"
storage_access_key => "STORAGE-ACCOUNT-KEY-REMOVED-HASS"
container => "insights-logs-networksecuritygroupflowevent"
registry_path => "data2/registry"
registry_create_policy => "resume"
codec => "json"
file_head_bytes => 12
file_tail_bytes => 2
blob_list_page_size => 150
interval => 5
}
azureblob
{
storage_account_name => "STORAGE-ACCOUNT-NAME-REMOVED-HASS"
storage_access_key => "STORAGE-ACCOUNT-KEY-REMOVED-HASS"
container => "insights-logs-networksecuritygroupflowevent"
registry_path => "data2/registry"
registry_create_policy => "resume"
codec => "json"
file_head_bytes => 12
file_tail_bytes => 2
blob_list_page_size => 150
interval => 5
}
azureblob
{
storage_account_name => "STORAGE-ACCOUNT-NAME-REMOVED-HASS"
storage_access_key => "STORAGE-ACCOUNT-KEY-REMOVED-HASS"
container => "insights-logs-networksecuritygroupflowevent"
registry_path => "data2/registry"
registry_create_policy => "resume"
codec => "json"
file_head_bytes => 12
file_tail_bytes => 2
blob_list_page_size => 150
interval => 5
}
}
filter {
split { field => "[records]" }
split { field => "[records][properties][flows]"}
split { field => "[records][properties][flows][flows]"}
split { field => "[records][properties][flows][flows][flowTuples]"}
mutate{
split => { "[records][resourceId]" => "/"}
add_field => {"Subscription" => "%{[records][resourceId][2]}"
"ResourceGroup" => "%{[records][resourceId][4]}"
"NetworkSecurityGroup" => "%{[records][resourceId][8]}"}
split => { "[records][properties][flows][flows][flowTuples]" => ","}
add_field => {
"unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
"srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
"destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
"srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
"destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
"protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
"trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
"traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
"short_message" => "hass test short_message"
"host" => "testserver"
}
add_field => {
"time" => "%{[records][time]}"
"systemId" => "%{[records][systemId]}"
"category" => "%{[records][category]}"
"operationName" => "%{[records][operationName]}"
"Version" => "%{[records][properties][Version]}"
"rule" => "%{[records][properties][flows][rule]}"
"mac" => "%{[records][properties][flows][flows][mac]}"
}
convert => {"unixtimestamp" => "integer"}
convert => {"srcPort" => "integer"}
convert => {"destPort" => "integer"}
}
date{
match => ["unixtimestamp" , "UNIX"]
}
}
output {
udp {
host => "172.18.4.10"
port => 12201
}
}