Dynamic extractor

Hello all :slight_smile:
I want to create a extractor which dynamically creates fields based on the message.

1. Describe your incident:
I have the following message for example:

<13>1 2022-01-20T14:00:54 [log] incidentId;13,creationtime;20.01.2022 13:41:44,severity;medium,description;'xyz,hosts;['lap1:12345']

And have a message with multiple hosts:

<13>1 2022-01-20T14:00:54 [log] incidentId;13,creationtime;20.01.2022 13:41:44,severity;medium,description;'xyz,hosts;['lap1:12345', 'lap2:67890', 'lap3:asadasd']

So the hosts braces contains variable amount of hosts.

2. How can the community help?
How can I get graylog to dynamically extract the hosts for and iterate their field names?
E.g.

host1=lap1
hostId1=12345
host2=lap2
hostId2=67890
…
etc.

Best regards

Hello @hollowdew

Ummm, I’m speechless :laughing:

For starters can you show how you configured your environment to ingest these logs this will help explaining what you need to do to resolve this problem.

Thanks :slight_smile:

Oh I am sorry for the missing parts.
So I have configured a udp raw input.
I retrieve the logs via an API and processing them in that script, then sending that logs to graylog.
Input

    bind_address: 0.0.0.0
    number_worker_threads: 8
    override_source:<empty> 
    port: 51454
    recv_buffer_size:262144

On this input above I want to add an grok extractor which extracts all the mentioned fields.
I could define a grok extractor which has 10 or 20 optional host-extracting-pattterns but this would not be the best solution I think.
I have for example the following grok pattern which extracts fields in that form like my first post:

.*\[log\].*incidentId\;%{DATA:cortex_incident_id}\,creationTimeHuman\;%{DATESTAMP:cortex_incident_datetime}\,severity\;%{DATA:cortex_incident_severity}\,description\;%{DATA:cortex_incident_description}\,hosts\;%{DATA:cortex_incident_hosts}\,users\;%{DATA:cortex_incident_users}\,alertCategories\;%{DATA:cortex_incident_alert_categories}\,url\;%{DATA:cortex_incident_url}$

So this pattern will extract the hosts in the field cortex_incident_hosts which could have only one host:hostid or multiple hosts and their hostids.
Therefore I want to create a extractor which runs if the cortex_incident_hosts is present and split all the hosts dynamically in fields like cortex_incident_host1 cortex_incident_host2 cortex_incident_host3 etc.
For now I have only a extractor which extract a single host:hostid value from the [ ] block:

\[\'%{DATA:cortex_incident_host}\:%{DATA:cortex_incident_host_id}\'\]$

Hope this is a better explantation :slight_smile:

Hello,

Yes and thank you :slight_smile:

Well you have two options that I see.

  1. You can make the amount of extractors needed to sort out ALL you host. If this is a small environment should not be a problem, but if this is a large environment there may be resource issues.
  2. Pipeline which I’m not that good yet, still learning and maybe its not possible but @tmacgbay would know if this could be done in that fashion.

Could you give us an example of the message received ? This would at least give me something to work on.

1 Like

The interesting thing about pipelines is you can have the first stage only break out the fields that are common to all messages - like say breaking out PaloAlto into type log log (SYSTEM, TRAFFIC, GLOBALPROTECT…) then in subsequent stages you can have rules that break out further based on existing data you have broken out. For instance have a rule that specifically breaks down SYSTEM logs and a separate rule that further breaks down TRAFFIC logs.

For what you want to do, Graylog doesn’t really have a do-until loop (or similar) that you can iterate through an unknown amount of items. If you are lucky there is a field name associated with each data item… if not you might have to set a fixed amount that you try… Perhaps there is something in Cortex that allows you to break out messages separately with the host names you are looking at rather than one message?

You may have to bear the brunt of having a new fieldname that has the hostID as it’s data. which could be a problem when elasticsearch’s (default but changable) limit is 1000 fields…

1 Like

Hello,

thank you for your replies.
I am also not good in pipelines right now but I will give it a try.
Also these logs are fetched using a API call in a cron with a bash script I created.
I could split those messages which contains more than one host and send to graylog the same message including only one single host at the time.
By now the messages look like this:

<13>1 2022-01-20T14:00:54 paloaltocortex paloaltocortex:custom-job:retrieve-new-incidents [log] incidentId;13,creationTimeHuman;20.01.2022 13:41:44,severity;medium,description;'Local Analysis Malware' along with 11 other alerts generated by XDR Agent detected on host;hosts;['lap1:213123123123'],users;['testuser'],alertCategories;['Malware'],url;https://cortexurl.com/incident-view?caseId=13
<13>1 2022-01-20T14:00:54 paloaltocortex paloaltocortex:custom-job:retrieve-new-incidents [log] incidentId;13,creationTimeHuman;20.01.2022 13:41:44,severity;medium,description;'Local Analysis Malware' along with 11 other alerts generated by XDR Agent detected on host;hosts;['lap1:213123123123', 'lap2:11111111', 'lap3:12312312322'],users;['testuser'],alertCategories;['Malware'],url;https://cortexurl.com/incident-view?caseId=13

Hello,

After looking in the forum I did come across using CSV file for a “Lookup table”, maybe that can be an option.
Other then that pipeline maybe something like this. It may or may not work, I gave it a try :smiley:

rule "Host lap1"
when
    has_field("message") AND contains(to_string($message.message), "lap1:213123123123")
then
    set_field("host1","lap1");
end
rule "Host lap2"
when
    has_field("message") AND contains(to_string($message.message), "lap2:11111111")
then
    set_field("host2","lap2");
end
rule "Host lap3"
when
    has_field("message") AND contains(to_string($message.message), "lap3:12312312322")
then
    set_field("host3","lap3");
end

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.