Unifi syslog, stream, pipline, regex

Dear community,

I am working on my first pipeline rule. I stumbled accross something, I don’t understand. I want to parse a message and use regex to write values in additional fields. I am pretty sure, that my regex is valid, but no messages are processed. When I copy the received message, there are other fields available, as in the GUI.

What do I have to do, to store source-ip, destination-ip and the ports in dedicated fields?

example message (syslog input from Unifi)

<4>Dec 26 10:33:49 USG-3P kernel: [LAN_LOCAL-default-A]IN=eth1 OUT= MAC=24:5a:4c:7b:37:aa:18:e8:29:b6:50:73:08:00 SRC=192.168.1.2 DST=192.168.1.1 LEN=52 TOS=0x00 PREC=0x00 TTL=64 ID=430 DF PROTO=TCP SPT=8080 DPT=52663 WINDOW=246 RES=0x00 ACK URGP=0

my pipeline rule

rule "Regex Test"
when
  contains(to_string($message.source), "usg", true)
then
  let values = regex("^.*((?<=SRC=)\\d+\\.\\d+\\.\\d+\\.\\d+).*((?<=DST=)\\d+\\.\\d+\\.\\d+\\.\\d+).*((?<=PROTO=)\\w+).*((?<=SPT=)\\d+).*((?<=DPT=)\\d+).*$", (to_string($message.message)));
  set_field("source IP", values["0"]);
end

screenshot from GUI

content of “copy message”
e.g. there is already a remote_ip and remote_port available, but not shown in the GUI above.

{
  "gl2_accounted_message_size": 630,
  "level": 4,
  "gl2_remote_ip": "192.168.70.1",
  "gl2_remote_port": 45792,
  "streams": [
    "000000000000000000000001",
    "63a623024dde366c72447bb8"
  ],
  "gl2_message_id": "01GN6WVF6J7QK5AAAVXAVPPA7Q",
  "source": "USG-3P",
  "message": "USG-3P kernel: [LAN_LOCAL-default-A]IN=eth1 OUT= MAC=24:5a:4c:7b:37:aa:18:e8:29:b6:50:73:08:00 SRC=192.168.1.2 DST=192.168.1.1 LEN=52 TOS=0x00 PREC=0x00 TTL=64 ID=8174 DF PROTO=TCP SPT=8080 DPT=52682 WINDOW=314 RES=0x00 ACK FIN URGP=0",
  "gl2_source_input": "63a61d1a4dde366c72446f31",
  "full_message": "<4>Dec 26 10:40:06 USG-3P kernel: [LAN_LOCAL-default-A]IN=eth1 OUT= MAC=24:5a:4c:7b:37:aa:18:e8:29:b6:50:73:08:00 SRC=192.168.1.2 DST=192.168.1.1 LEN=52 TOS=0x00 PREC=0x00 TTL=64 ID=8174 DF PROTO=TCP SPT=8080 DPT=52682 WINDOW=314 RES=0x00 ACK FIN URGP=0",
  "facility_num": 0,
  "gl2_source_node": "e3923345-1eb2-43a2-8339-c60bf94e7539",
  "_id": "4820e410-8501-11ed-b8d9-02420a0a0a02",
  "facility": "kernel",
  "timestamp": "2022-12-26T09:40:06.000Z"
}

I am using Test grok patterns to build my regex. It allows to use multiple different logs with one pattern.
It has some limitations around casting some fields e.g. %{INT:field_name:NONNEGINT} will not work, but besides that it is a good tool.
To get it work for Graylog, you have to adjust the escaping. Graylog needs usually two backslash in a pipeline.

If I take your pattern into the debugger, it does not work even though I remove the wrong escape.

Something like this works:

.*?IN=(\w+) OUT=(?:(\w+))? MAC=…
set_field (
field: “network_interface_in”,
value: grep[“0”]
);


Now, there are much better ways to get the fields. One way would be a key value parser or work with GROK.

GROK could look like this:

set_fields (
fields:
grok (
pattern:“.*?IN=%{WORD:network_interface_in} OUT=(?:%{WORD:network_interface_out})? MAC=%{MAC:source_mac} SRC=%{IP:source_ip}…”,
value: to_string ($message.message),
only_named_captures: true
)
);

I didn’t test the code, but I hope you get the idea. The interface OUT might be optional.GROK is a much more robust way to parse out fields. Here, it will take IPv4 and IPv6 and something like 300.999.34.5 will not work.

Hm, I also see that MAC= is not a real MAC, it has to be something else. Maybe it is a copy paste error, or the log is like that. In that case a MAC Grok pattern will not work.

Hi @StefanAustin,

Thank you very much for your input. I have been learning lots of different syntaxes in the last two years and I was hoping to use something, I was familiar with. I am not keen on working myself into yet another syntax…

Having said that, I will have a closer look at GROK and try to see, if I will understand the beauty of it.

If someone else has a working regex-solution, I wouldn’d mind getting some help with that. :slight_smile:

Hi @StefanAustin,

so I did create a grok pattern, which fits my needs.

^.*?SRC=%{IP:source_ip} DST=%{IP:destination_ip}.*? PROTO=%{WORD:protocol} SPT=%{NUMBER:source_port} DPT=%{NUMBER:destination_port} .*?$
USG-3P kernel: [LAN_LOCAL-default-A]IN=eth1.40 OUT= MAC= SRC=192.168.40.1 DST=224.0.0.251 LEN=138 TOS=0x00 PREC=0x00 TTL=1 ID=33414 DF PROTO=UDP SPT=5353 DPT=5353 LEN=118
MATCHED
destination_ip 	224.0.0.251
source_port 	5353
source_ip 	192.168.40.1
protocol 	UDP
destination_port 	5353
USG-3P kernel: [LAN_LOCAL-default-A]IN=eth1 OUT= MAC=24:5a:4c:7b:37:aa:18:e8:29:b6:50:73:08:00 SRC=192.168.1.2 DST=192.168.1.1 LEN=84 TOS=0x00 PREC=0x00 TTL=64 ID=27575 DF PROTO=UDP SPT=3478 DPT=42848 LEN=64
MATCHED
destination_ip 	192.168.1.1
source_port 	3478
source_ip 	192.168.1.2
protocol 	UDP
destination_port 	42848 

How to I implement this into a Pipeline to get new fiels into my log messages?

@StefanAustin,

okay, this was quick. :slight_smile:

I was able to create the respective rule and implement it. There are now even more fields available, then I specified.

rule "Unifi USG-3P firewall messages"
when
  contains(to_string($message.source), "usg", true)
then
  set_fields(
    grok(
      pattern:"^.*?SRC=%{IP:source_ip} DST=%{IP:destination_ip}.*? PROTO=%{WORD:protocol} SPT=%{NUMBER:source_port} DPT=%{NUMBER:destination_port} .*?$",
      value:to_string($message.message)
    )
  );
end

I added the rule to a Pipeline, which only contains this rule and is connected to a specific stream.

Why is the rule applied to the default stream as well? I do not mind, but as far as In know, this shouldn’t be?

Chris

Screenshots

Pipeline Rule

Pipeline

Stream “Unifi Syslog”

default stream

Awesome,
It looks like it works.
There are multiple ways to get rid off the default stream. One is via code:

route_to_stream (
name: “your_stream_name”,
remove_from_default: true
);

If you create your user stream via GUI, there should be a tick with remove from default stream. Both ways might work.

If you write rules, it is common to do the identification in in one stage, the next stage is moving the logs into a user stream and at the end you do all the parsing stuff.
The reason is, you do not want to do the heavy lifting in your first rule because every single log will hit that rule.
It depends on how many logs you are getting. If you are getting 100 per hour, it is not that important, but if you are getting 1000 per second, the first rule and all the others have to be efficient as possible.

Example: You are using “contains”. It will work very easy, but it is a very expensive search.

To identify messages you can use things like:

grok (
pattern: “IN=%{NOTSPACE} OUT=%{NOTSPACE} MAC=”,
value: abbreviate (
value: to_string ( $message.message ),
width: 255
)
).matches == true

It will look for the specific string within the first 255 chars and it a lot cheaper compared to a contains.

If you do not like fields (e.g. created via key value parser), you can remove them:

remove_field ( “unwanted_field_name” );

Additionally, you might want to use Graylog’s schema. protocol would be network_transport.

The reason is, you might can reuse content from other users. Maybe someone else build a dashboard and if both use the same field names, it will work right away.

https://schema.graylog.org/en/development/schema/entities/network.html

I saw you have unwanted fields like IPv4. To get rid of those, you need this line in your GROK section:

only_named_captures: true

I hope I didn’t flood you with ideas :wink:

Hi @StefanAustin,

thank you very much for all that inputs. I will adopt one or two things. :slight_smile:

One thing, I do not yet understand:
I thought that my pipline does its magic only to the stream, which it is connected to. But obiously, the pipeline is applyed to the default stream as well. Is this how it should be?

A single message can be routed into multiple streams. In your case, both the default all messages stream and your Unifi Syslog stream. There is only a single instance of the message. So any changes are reflected regardless via which stream you choose to access it.

1 Like

Hi @patrickmann,

thank you for your answer. Do I understand you correctly, that the messages exist only ones and are shown in two streams?

(I thought that me routing the syslog-messages into my Unifi-stream leads to a copying of the message into the new stream. (or a move, if a click to remove it from default stream).)

What about the indices? The “copied” message will be saved in both indices?

Thanks for clarifying,
Chris

hi @schneich
if you open the message you can see in the “metadata” on the left, to which stream the message belongs. There can be multiple streams, and also legit use cases for it. I use this feature e. g. for windows: All windows logs are on one “windows all” stream. Then we have windows from AD-Servers, Windows from Fileservers, windows from here, there and so on.
Some alerting can be done on the general windows all stream, some only on specific, it depends.

As far as all the streams exist on the same index set I would assume they are ingested only once, but with several stream-IDs. If they are on multiple index sets I would assume it increases the costs.

Hi @ihe,
thank you for clarifying that. I decided to route all messages into specific streams and delete them messages from the default stream. The default stream is now empty.

thanks all to your insights and valuable help!
Chris

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.