I am trying to get a pipeline working and can’t seem to get the rule to work properly. I have a separate input configured for all cisco switches. I then have a stream configured to match the gl2_source_input cisco switches. I then created a pipeline that is connected to the cisco switch stream. That pipeline has 1 rule which is to grok the messages that it receives and break it into fields.
My rule is as follows:
rule "Grok Cisco syslog"
when
true
then
set_fields (
fields: grok (
pattern: "%{CISCO_SYSLOG_1}",
value: to_string($message.full_message),
only_named_captures: true
)
);
end
This is where I’m struggling. I have the grok pattern working properly according to the “test with sample data” in the editor. That parses the fields correctly.
My problem is that when running the simulator on the pipeline, it does not parse the message properly nor does it store the message. I have even simplified the pattern to just GREEDYDATA and it still doesn’t work.
I have discovered that GREEDYDATA works on a value of $message.message however it still doesn’t store the message.
I am not sure what I am missing to get the pipeline to parse the message and break it into the same fields that it shows in the first image when testing with grok.
%{IPV4:source_ip} %{GREEDYDATA} Host %{DATA:mac_accdress} in vlan %{INT:vlan} is flapping between port %{GREEDYDATA:port_a} and port %{GREEDYDATA:port_b}
This is for the first picture, I believe that an extractor/Grok pattern which is a easy way to go.
My pattern in the first picture works. All the data is parsed correctly. That is why I am confused as to why it doesn’t work. Additional, that is just one message of a particular format. There are other messages that are not port security that follow that message.
Does graylog parse the messages before sending them through a pipeline. Sometimes it seems like it doesn’t leave the whole message intact by the time it gets to the pipeline stage.
Another thing I need to know is what caused the data to be stored? I thought it would be if a pattern matches however, if I change the pattern to simply %{GREEDYDATA}, it is still not stored.
More information:
Apparently graylog does not allow the standard expressions for beginning of line and end of line. My original pattern has the ^ and $ for beginning and end which, when I was learning elasticsearch, was taught to use as it makes the grok quicker instead of looking for a pattern in any portion of the substring, this causes it to either match the entire string or move on. The custom grok pattern I created has this in it. It works on test data as seen from my earlier screenshots. HOWEVER when used in a rule for a pipeline, apparently this is not allowed. Removing those to characters now has my pattern matching under the rule simulation. You can see here that the additional fields facility, facility_mnemonic, first_number etc. Those are all fields that are parsed in my custom grok pattern.
Still more information. While using the simulator in the pipeline I set the following settings:
Stream: Cisco switches
Raw message: the same port security message as I’ve been using
message input: cisco switches
message codec: raw string
Loading this message and running the simulation, it works properly. It shows the added fields. Why does it not log them this way???
This is what I expect to happen and want this information indexed.
So I decided to try an extractor using the same custom pattern I have configured and that works just fine. What is it about pipelines that makes this more complicated?
Not sure, but you could run a DEBUG to find out what is going on.
example:
rule "firewall"
when
true
then
let fw = grok(pattern: "%{SYSLOGTIMESTAMP} %{GREEDYDATA} fw=\"%{DATA:source}\" pri=%{INT:priority} msg=\"%{DATA:identifer}: %{DATA:loglevel}: %{DATA:message}\"", value: "message", only_named_captures: true);
set_fields(fw);
debug(concat("=== This is fw: ", to_string(fw)));
end
Big thanks to @gsmith for reminding me about the debug feature!
I changed my rule to:
rule "Grok Cisco syslog"
when
true
then
let parsed = grok (
pattern: "%{CISCO_SYSLOG_1}",
value: to_string($message.full_message),
only_named_captures: true
);
set_fields (
fields: parsed
);
debug(concat("===Debug Message --> ", to_string(parsed)));
end
I checked the logs and the message was empty. It was not even parsing the string. Some additional searching led me to recheck the message processor order. The pipeline processor was BEFORE the stream processor! A quick adjustment on that and suddenly I was processing messages!
After that, I had to debug the message that was being received and realized that it was a shortened version of the entire syslog message that I was expecting. I changed the $message.message to $message.full_message and finally my messages are being parsed correctly!
I’m not sure why the default would be to have the stream processor after the pipeline processor since the documentation seems to point you in the direction of using streams for processing.
Hopefully my struggles help someone else along the road. I can hopefully continue on from here.