Pipeline Rules to replace Extractor?

Hi,

I’m trying to create a pipeline rule that works similar to the extractor I created for a file beats input.

The extractor that works for exchange tracking logs:

{
"extractors": [
{
"title": "ex-msg-trk",
"extractor_type": "grok",
"converters": [],
"order": 0,
"cursor_strategy": "copy",
"source_field": "message",
"target_field": "",
"extractor_config": {
"grok_pattern": "(%{TIMESTAMP_ISO8601:date-time})?,(%{IPORHOST:client-ip})?,(%{IPORHOST:client-hostname})?,(%{IPORHOST:server-ip})?,(%{IPORHOST:server-hostname})?,(%{GREEDYDATA:source-context})?,(%{GREEDYDATA:connector-id})?,(%{WORD:source})?,(%{WORD:event-id})?,(%{NUMBER:internal-message-id})?,(%{GREEDYDATA:message-id})?,(%{GREEDYDATA:recipient-address})?,(%{GREEDYDATA:recipient-status})?,(%{NUMBER:total-bytes})?,(%{NUMBER:recipient-count})?,(%{GREEDYDATA:related-recipient-address})?,(%{GREEDYDATA:reference})?,(%{GREEDYDATA:message-subject})?,(%{GREEDYDATA:sender-address})?,(%{GREEDYDATA:return-path})?,(%{GREEDYDATA:message-info})?,(%{WORD:directionality})?,(%{GREEDYDATA:tenant-id})?,(%{IPORHOST:original-client-ip})?,(%{IPORHOST:original-server-ip})?,(%{GREEDYDATA:custom-data})?",
"named_captures_only": true
},
"condition_type": "none",
"condition_value": ""
}
],
"version": "2.2.0-SNAPSHOT"
}

I have file beats collecting microsoft exchange logs and adding a type of ex-msg-trk-mailbox or ex-msg-trk-transport
I created a stream called “stream ex-msg-trk” which is grabbing the messages:
Field type must match exactly ex-msg-trk-mailbox (Exchange stream rule ex-msg-trk-mailbox)
Field type must match exactly ex-msg-trk-transport (Exchange stream rule ex-msg-trk-transport)

I then created a piepline called “pipeline ex-msg-trk” connected to “stream ex-msg-trk”
I added a stage 0 to match “At least one of the rules on this stage matches the message”
which is the point at which I’m stuck and it’s not processing the messages.

I added two rules:

Rule
ex-msg-trk-mailbox
ex-msg-trk-transport

With the rule I want to basically do what I’ve done with an extractor but only have it apply to this rule

rule "type ex-msg-trk-mailbox"
when
 has_field("type") && to_string($message.type) == "ex-msg-trk-mailbox"
then
  // grok the message field
let message_field = to_string($message.message);
let action = grok(pattern: "(%{TIMESTAMP_ISO8601:date-time})?,(%{IPORHOST:client-ip})?,(%{IPORHOST:client-hostname})?,(%{IPORHOST:server-ip})?,(%{IPORHOST:server-hostname})?,(%{GREEDYDATA:source-context})?,(%{GREEDYDATA:connector-id})?,(%{WORD:source})?,(%{WORD:event-id})?,(%{NUMBER:internal-message-id})?,(%{GREEDYDATA:message-id})?,(%{GREEDYDATA:recipient-address})?,(%{GREEDYDATA:recipient-status})?,(%{NUMBER:total-bytes})?,(%{NUMBER:recipient-count})?,(%{GREEDYDATA:related-recipient-address})?,(%{GREEDYDATA:reference})?,(%{GREEDYDATA:message-subject})?,(%{GREEDYDATA:sender-address})?,(%{GREEDYDATA:return-path})?,(%{GREEDYDATA:message-info})?,(%{WORD:directionality})?,(%{GREEDYDATA:tenant-id})?,(%{IPORHOST:original-client-ip})?,(%{IPORHOST:original-server-ip})?,(%{GREEDYDATA:custom-data})?", value: message_field,  only_named_captures: true);
  set_fields(action);
end

What am I missing or doing wrong?

I also tried a rule like:

rule "type ex-msg-trk-transport try 2"
when
  contains(to_string($message.type), "ex-msg-trk-transport")
then
  // grok the message field
let message_field = to_string($message.message);
let action = grok(pattern: "(%{TIMESTAMP_ISO8601:date-time})?,(%{IPORHOST:client-ip})?,(%{IPORHOST:client-hostname})?,(%{IPORHOST:server-ip})?,(%{IPORHOST:server-hostname})?,(%{GREEDYDATA:source-context})?,(%{GREEDYDATA:connector-id})?,(%{WORD:source})?,(%{WORD:event-id})?,(%{NUMBER:internal-message-id})?,(%{GREEDYDATA:message-id})?,(%{GREEDYDATA:recipient-address})?,(%{GREEDYDATA:recipient-status})?,(%{NUMBER:total-bytes})?,(%{NUMBER:recipient-count})?,(%{GREEDYDATA:related-recipient-address})?,(%{GREEDYDATA:reference})?,(%{GREEDYDATA:message-subject})?,(%{GREEDYDATA:sender-address})?,(%{GREEDYDATA:return-path})?,(%{GREEDYDATA:message-info})?,(%{WORD:directionality})?,(%{GREEDYDATA:tenant-id})?,(%{IPORHOST:original-client-ip})?,(%{IPORHOST:original-server-ip})?,(%{GREEDYDATA:custom-data})?", value: message_field,  only_named_captures: true);
  set_fields(action);
end

The first rule I tried is working. I needed to:

enable the Pipeline Processor message processor, and you must also set the Pipeline Processor after the Message Filter Chain, if you want to be able to use pipelines in your streams, and get access to fields coming from extractors.