Pipeline Instead of JSON Extractor for Office/Azure Collector

Good Afternoon:

I currently use and benefit (greatly) from @ddbnl 'sOffice365/Azure collector: ddbnl’s Office365/Azure Collector.

Unfortunately (for me) the extractor they provide also stores a lot of extraneous information, including one field that causes ~2,000 errors per 24 hours (Example):

OpenSearchException[OpenSearch exception [type=mapper_parsing_exception, reason=failed to parse field [ListBaseType] of type [long] in document with id '09fb8989-436d-11ee-bb9b-9acc4b3b621e'. Preview of field's value: 'DocumentLibrary']]; nested: OpenSearchException[OpenSearch exception [type=illegal_argument_exception, reason=For input string: "DocumentLibrary"]];

I understand a better approach would be to create a Pipeline + Rules. Unfortunately, Pipelines are well outside of my comfort zone.

So my question is whether anyone could provide a few breadcrumbs on how to extract just certain fields from a message. For example: “Operation”, “Record Type”, “Device Properties”, etc. from a message like:

{
  "AzureActiveDirectoryEventType": 1,
  "gl2_remote_ip": "192.168.128.117",
  "gl2_remote_port": 41596,
  "UserKey": "<redacted>",
  "ActorIpAddress": "<redacted>",
  "source": "192.168.128.117",
  "Operation": "UserLoginFailed",
  "OrganizationId": "<redacted>",
  "gl2_source_input": "<redacted>",
  "ExtendedProperties": "{Name=ResultStatusDetail, Value=UserError}, {Name=UserAgent, Value=Windows-AzureAD-Authentication-Provider/1.0}, {Name=UserAuthenticationMethod, Value=262144}, {Name=RequestType, Value=OAuth2:Token}",
  "IntraSystemId": "d0b9c2a4-ee31-4130-b2f8-03fb7ed56600",
  "Target": "{ID=<redacted>, Type=0}",
  "RecordType": 15,
  
...
}

If there is an easier way, like continuing to use the JSON extractor but with the ability to “ignore” certain fields, I am all ears.

As always, thank you!

Pipelines does take a bit to warm up to and not the most intuitive, but once you do it a few times it will make a lot of sense.

To directly answer your question, i would approach this with a pipeline rule to delete whatever fields i don’t want. The general idea with pipelines is:

1. Create a pipeline and “connect it” to an applicable stream

All pipelines must be attached to at least 1 stream but can be attacked to more than 1. this is to ensure that ONLY the messages in this stream are processed via the pipeline

2. Create a pipeline rule

  • This is the tough part, easier said than done
  • the gist of it is that pipeline rules have 3 essential components:
    1. Rule name
    2. condition (what messages to apply the rule to)
    3. action (what the rule will do to the messages that match the condition)

For this specific example, we can keep it as simple as possible:

rule "DROP unwanted M365 fields"
when
    to_string($message.whatever_field_name) == ""
then
    remove_field("field_name_to_remove");
end

To break this down:

rule "DROP unwanted M365 fields" is the rule name. This is what we will use to add this pipeline rule to the pipeline (that is linked to 1 or more streams)

when
    to_string($message.whatever_field_name) == ""

This is the condition. You do unfortunately have to explicitly cast the variables. So in the above example we use the function to_string(). Inside of the parenthesis is the field we want to compare. Graylog treats the message more or less as a json object, meaning that the entire message including ALL fields is contained within $message, and we can access the fields within the message using $message.fieldname where fieldname is the name of the field you see in graylog. You can have more than 1 condition, for example condition 1 and condition 2:

when
    to_string($message.whatever_field_name) == ""
    && to_string($message.another_field_name) == ""

Lastly we have the actions:

then
    remove_field("field_name_to_remove");
end

This is where we can tell the rule to do anything we need it to do. The quick reference to the right of the pipeline rule editor shows you an exhaustive list of functions, the syntax. For this example we will only focus on remove_field. We can use this function to remove a field by its field name. We can remove more than 1 field but will need multiple remove_field() functions to do so:

then
    remove_field("field_name_to_remove");
    remove_field("field_name_to_remove");
end

Each action MUST end with a semicolon (;)

Lastly lastly, the rule MUST end with end.

3. add the pipeline rule to the pipeline

At this point you should have your pipeline created, connected to at least one stream, and at least 1 pipeline rule created. From here we can add the rule to the pipeline. Click the edit beneath the Stage 0 section, click in the ‘select’ box beneath Stage rules and select the rule created above. Notice the name matches the name specified in the rule "rulename" line of the pipeline rule.

Thats the long and short of it. Hope that helps.

1 Like

Thank you! I will try this shortly and mark “solution”, then!

Unfortunately it would appear that I did something wrong. Here is my rule:

rule "DROP unwanted M365/Azure fields"
when
  to_string($message.ClientIP) == ""
then
  remove_field("FileSizeBytes");
  remove_field("HighPriorityMediaProcessing");
  remove_field("ListBaseType");
  remove_field("ListItemUniqueId");
  remove_field("ListId");
  remove_field("ListServerTemplate");
  remove_field("UserKey");
  remove_field("UserType");
  remove_field("Webid");
end

However, of note, things like “ListBaseType” remain. Perhaps I misunderstand the to_string($message.ClientIP) == "" as I utilized a field - ClientIP - that is in every message. Does it matter that this field - ClientIP - exists because of the JSON Extractor that runs at input, prior to Stream?

What does your processors configuration order look like?

Via System / Configurations / Message Processors

Can you post a screenshot?

Perhaps of note, my pipeline:

(Steps 0 - 1 work fine)

That is interesting, can you try to use the input ID instead?

  1. on the search page, click on the fields icon in the left most menu
    • image
  2. Search for gl2_source_input and click on “all including reserved”
    • image
  3. Click the field to reveal its action menu, select “Add to all tables”

This will tell you the value of the input that you can use in your pipeline rule. This field is guaranteed to be present since it is added by the input as soon as the message is received.

So your rule condition would look something like:

rule "rulename"
when
     to_string($message. gl2_source_input) == "6387955208eb175f71dbe2e2"
1 Like

That seems to have done the trick!

My working theory as to why “ClientIP” did not work has to do with the timing of the extractor - which creates the fields - and the timing of the Pipeline processing.

Irrespective, case closed! Thank you!

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.