Try to use this 2 pipeline rules.
I assume, that you have email addresses extracted in field recipients
First one (step 1) clean unnecesery chars ["] from recipient field, split values with comma, extract first email, extract domain, compare with lookup table, setup field internal (true/false)
rule "email jsonarray 1.1"
when
has_field("recipients") AND contains(to_string($message.recipients), "@")
then
// Remove unneccesary strings []" from array field
let fix_strings = regex_replace("(\\[|\"|\\])", to_string($message.recipients), "");
// Split email addresses
let split_emails = split(",", fix_strings);
// Join emails without first one (from second to last)
let join_emails = join(split_emails, ",", 1, -1);
// Save it to temporaty field (used also in second pipeline rule condition)
set_field("recipients_tmp", join_emails);
// Extract domain from first email address
let extract_domain = regex("@(.*)$", to_string(split_emails[0]));
// Lookup domain in CSV - "email_domains" = name of lookup table
let lookup_internal = lookup_value("email_domains", extract_domain["0"]);
set_field("internal", to_bool(lookup_internal));
end
Second pipeline rule do similar to first one, and use temporary field recipients_tmp as input field. It’s run only when internal=false and field contains @ (so it will run until find first internal domain). Instead of join (to create field recipeints_tmp) this pipeline rule uses regex_replace
, due to graylog can’t store empty string in field. So if it will be use join as first rule, it will run much more times than expected.
rule "email jsonarray 1.2"
when
has_field("recipients_tmp") AND contains(to_string($message.recipients_tmp), "@") AND to_bool($message.internal2) == false
then
// Split email addresses
let split_emails = split(",", to_string($message.recipients_tmp));
// Replace first email with |
let replace_first = regex_replace("^[^,]+[,]{0,1}", to_string($message.recipients_tmp), "|");
// Save remaining emails to temporaty field
set_field("recipients_tmp", to_string(replace_first));
// Extract domain from first email address
let extract_domain = regex("@(.*)$", to_string(split_emails[0]));
// Lookup domain in CSV - "email_domains" = name of lookup table
let lookup_internal = lookup_value("email_domains", extract_domain["0"]);
set_field("internal2", to_bool(lookup_internal));
end
How to use:
- Create lookup table
email_domains
with Default single value
to false (string). Create CSV file and rows should include only local domains, not external. If not included in CSV, all other domains will be theated as external (internal = false) => Default single value
. Use quotes in CSV, it’s necessary in graylog.
Format of CSV:
"domain","internal"
"mycompany.com","true"
- Create new pipeline
- Assign pipeline rule
email jsonarray 1.1
to step 1
- Create as many steps (2 and more) as propable max number of recipients in array (5-10 e.g. or more)
- Assign pipeline rule
email jsonarray 1.2
to all step from 2 to number of steps created in point 4
- Done
Pipeline rules will process all email addresses in json array field, and stop after first internal domain.