Creating map from a list of keys and list of values

I am new to Graylog and watched a recent video by Lawrence Systems on Graylog.
Rather than use an extractor, I’m attempting to parse pf filterlog CSV log lines into fields via pipeline rules.
For example:

rule "filterlog IPv4 TCP"
when
  has_field("message") &&
  contains(to_string($message.application_name), "filterlog") &&
  regex("^.+,(in|out),4,.*,tcp,.*$", to_string($message.message)).matches == true

then
  //set_field("filterlog", "tcp");
  
  let keys = [
    "RuleNumber", "SubRuleNumber", "Anchor", "Tracker", "Interface", "Reason", "Action", "Direction", "IPVersion",
    "TOS", "ECN", "TTL", "ID", "Offset", "Flags", "ProtocolID", "Protocol", "Length", "SourceIP", "DestIP",
    "SourcePort", "DestPort", "DataLength", "TCPFlags", "Sequence", "ACK", "Window", "URG", "Options"
  ];
  
  let values = split(",", to_string($message.message));
  
  let map = {}; // Create map from keys,values lists?
  
  set_fields(
    fields: map
  );

end

I cannot figure out how to construct map, from keys and values, in order to pass to set_fields().

How might I do that?
Also, I do not understand what role the message argument to set_fields() plays.
I am using graylog 4.2.
Any advice would be much appreciated.

I think what you need is the split function, but need to see a sample log to be sure. Can you sanitize a few lines and post them here?

Thanks Chris. I am using split() to get the CSV values as a list as above.
I then have a statically defined list that contains the field names (keys).

This is the working extractor, based on https://github.com/lawrencesystems/graylog_extractors/blob/main/pfsense_2023.json:

{
  "title": "filterlog IPv4 TCP",
  "extractor_type": "regex",
  "converters": [
    {
      "type": "csv",
      "config": {
        "trim_leading_whitespace": false,
        "column_header": "RuleNumber,SubRuleNumber,Anchor,Tracker,Interface,Reason,Action,Direction,IPVersion,TOS,ECN,TTL,ID,Offset,Flags,ProtocolID,Protocol,Length,SourceIP,DestIP,SourcePort,DestPort,DataLength,TCPFlags,Sequence,ACK,Window,URG,Options",
        "strict_quotes": false
      }
    }
  ],
  "order": 1,
  "cursor_strategy": "copy",
  "source_field": "full_message",
  "target_field": "FilterData",
  "extractor_config": {
    "regex_value": "filterlog \\d+ - - (.+)"
  },
  "condition_type": "regex",
  "condition_value": "filterlog \\d+ - - (.+),(in|out),4,.*,tcp,.*$"
}

I modified it since I’m sending the logs as RFC5424 and this results in slightly different message parsing by the Graylog syslog input:

application_name: filterlog
facility: local0
facility_num: 16
full_message: <134>1 2023-05-19T13:52:21.917213+01:00 fw.local.lan filterlog 15175 - - 117,,,1770008978,pppoe0,match,block,in,4,0x0,,249,54321,0,none,6,tcp,44,198.235.24.121,192.168.100.10,55344,443,0,S,3528040379,,65535,,mss
level: 6
message: 117,,,1770008978,pppoe0,match,block,in,4,0x0,,249,54321,0,none,6,tcp,44,198.235.24.121,192.168.100.10,55344,443,0,S,3528040379,,65535,,mss
process_id: 15175
source: fw.local.lan
timestamp: 2023-05-19 13:52:21.917 +01:00

I’d like to implement that extractor in a pipeline because
a) it allows for testing one field whilst acting on another therefore avoiding unnecessary regexes
b) pipelines seem like a good idea!

However pipeline functions don’t appear to provide a way of creating a map from two lists in order for me to imitate the extractor’s built-in CSV converter.

Well the following works. But I resorted to assigning each and every element.

rule "filterlog IPv4 TCP"
when
  has_field("message") &&
  contains(to_string($message.application_name), "filterlog") &&
  regex("^\\S+,(in|out),4,.*,tcp,.*$", to_string($message.message)).matches == true

then
  let message = concat(to_string($message.message), ",EOL");
  let values = split(",", message);
  let map = {
    RuleNumber:     values[0],
    SubRuleNumber:  values[1],
    Anchor:         values[2],
    Tracker:        values[3],
    Interface:      values[4],
    Reason:         values[5],
    Action:         values[6],
    Direction:      values[7],
    IPVersion:      values[8],
    TOS:            values[9],
    ECN:            values[10],
    TTL:            values[11],
    ID:             values[12],
    Offset:         values[13],
    Flags:          values[14],
    ProtocolID:     values[15],
    Protocol:       values[16],
    Length:         values[17],
    SourceIP:       values[18],
    DestIP:         values[19],
    SourcePort:     values[20],
    DestPort:       values[21],
    DataLength:     values[22]
    };
  set_fields(map);
end

@wdarcey,

with RegEx pipelines, you have to either declare each field in a set like you have in your rule, or declare them inline after each capture. I’ve always found that harder to read, so I’ve stuck with the list method.

You might try GROK, as it lets you include the name as part of the capture, but it’s pretty much the same. We can’t reference an external list as a source of field names.

It seems like a good feature request if you’re so inclined.

1 Like

Thanks Chris. The same precise functionality is available to extractors via the csv converter.
A named capture regex for these csv lines seemed unwieldly.
Some of the fields can be empty, including the last, which is why I added ‘EOL’ so that I received a predictable number of list elements from split(). That also appears to be taken care of by the existing extractor converter. It would seem to make sense to have that same functionality available to rules. Something like python’s zip set_fields(zip(keys, values)).

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.