Working with Pipelines

hi everyone. I have a lab with graylog and elasticsearch in the same node. in a few time it’s going to pass to production, at least i hope so.

All functions in Graylog works fine until now and it’s perfect for me BUT I have a few question about

¿HOW TO WORK WITH PIPELINES?

Because, until now I just using pipelines for parsing logs. The method that I use is creating GROK and then apply in a pipeline BUT in this moment I have like a 20 rules in the same Stage parsing and formatting logs.

¿Is it the right way to work with pipelines? Maybe ¿Can I apply this rules in progressive forms?

For example, in the first stage apply some fields, pass to the next stage and if it’s necessary apply other fields until finished what we need.

How do you work with pipelines? Do you have some advice for me?

Thanks to all

You seem to have the right idea. Use pipeline staging to sequence rule actions. Rules in a single stage are executed approximately at the same time (as far as I have read) so if a rule needs to work on the results of another rule, it has to be in the next stage. All this helps to keep rule actions fast.

I have to think how to restructure the rules for doing this, because at the moment I just need parse some logs, but when it start to grow up i will need to change the logic.

Do you have some examples? because i’m a little confuse how to do this

thx!!

here is an scenario in my environment:

Logs from Palo Alto come in as type SYSTEM or TRAFFIC (there are more but lets stick with these two). I want to break out the fields in the two types of logs but they have different fields. To apply the relevant rule to the fields, first I have a rule that pulls out the type and creates a log_type field for it so it would be either log_type:SYSTEM or log_type:TRAFFIC. This is placed in Stage 0 that is set up to move to Stage 1 if any rules in Stage 0 are “true” (rule passes when section and is executed)

rule "PA-Firewall - ex0 - set log type"
when
    regex(pattern: "(,TRAFFIC,|,THREAT,|,CONFIG,|,SYSTEM,)", value: to_string($message.message)).matches == true
then
    let splitlog = split(",", to_string($message.message));
    set_field("log_type", splitlog[3] );

 end

Now that I have marked the message for SYSTEM or TRAFFIC, in Stage 1 I can break the fields out based on the log_type field.

rule "PA-Firewall - ex1 - SYSTEM fields"
when
    has_field("log_type")           &&
    to_string($message.log_type) == "SYSTEM"
then
    let message     = to_string($message.message);

    // Regex breaks out event description in quotes here.
    // there are possibly commas in the description that messes up the split
    // so regex the event description and ignore ending fields since not needed.
    let snagy       = regex(pattern:    "(?<=,\")(.*)(?=.\",)", 
                            value:      message
                    );

    set_field("event_description", to_string(snagy["0"]));

	let splitsys   = split(",", message);
    set_field("hostname",                   splitsys[0]);
    set_field("receive_date_time",          splitsys[1]);
    set_field("serial_number",              splitsys[2]);
  //set_field("log_type",                   splitsys[3]); -- already handled
    set_field("log_subtype",				splitsys[4]);
    set_field("time_generated",				splitsys[6]);
  //set_field("virtual_system",				splitsys[7]); -- not used
    set_field("event_id_name",				splitsys[8]);
    set_field("session_object",				splitsys[9]);
    set_field("subtype_module",				splitsys[12]);
    set_field("event_severity",				splitsys[13]);
end

This is just a pipeline for FW logs - I have a separate pipeline that handles messages coming in from the Windows machines and parses out event id’s before putting them into their own stream/index.

Hopefully that helps…

1 Like

thank you so much, i appreciate your help. I’m going to check how to build some structure for my logs and i will try sharing it here.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.