Pipelines on same stages working on same messages?

We have defined two pipelines, both on stream ‘All Messages’, with different stages (0 and 1)

Original message:
facility: user-level
level: 6
timestamp: 2018-03-20T12:53:38.000Z
source: MyDummyHostName

Picked up by Pipeline (stage 0)
rule “security”
when
has_field(“source”)
&& contains(to_string($message.source),“MyDummyHostName”) == true
then
set_field(“pipeline_rule”,to_string(“security”));
set_field(“hostname”,to_string($message.source));
set_field(“source”,to_string(“syslog”));
route_to_stream(id:“5aa0ec7f3893e1097cc20f65”, remove_from_default:true); //security
end

results in Message:
facility: user-level
level: 6
hostname: MyDummyHostName
source: syslog
timestamp: 2018-03-20T12:56:51.000Z

And the above message is as well picked up by different pipeline (stage 1)
pipeline (stage 1)
rule “Syslog”
when
has_field(“facility”)== true
&& has_field(“level”)== true
&& contains(to_string($message.source),“MyDummyHostName”) == false
then
set_field(“pipeline_rule”,to_string(“Syslog”));
set_field(“hostname”,to_string($message.source));
set_field(“source”,“syslog”);
route_to_stream(id:“5aa0e05f3893e1097cc1a988”, remove_from_default:true); //syslog_default_new
end
facility: user-level
hostname: syslog
level: 6
message:
pipeline_rule: Syslog-Default-new
source: syslog
timestamp: 2018-03-20T12:59:40.000Z

My understanding is that the pipeline ‘security’, running as stage 0, connected to ‘All Messages’ stream, is picking up the message at first, doing all the stuff defined within the rule. Once that pipeline is finished, any other pipeline connected to the same stream running on the same stage are now run.
In my case, there is no pipeline anymore running on stage 0.
Now, the second pipeline ‘syslog’, running on stage 1 runs. Somehow that pipeline is working on the same message already handled by pipeline ‘security’, which had as well moved that message to a different stream (and deleted from original stream!).
IMHO that’s a fault, this approach results in having a message with wrong defined fields, as ‘hostname’ is now set to ‘syslog’.

So, did I found a bug or is it ‘works as designed’.
Am I’m doing something wrong?
Sure it’s possible to extend the ‘when’-definitions to not run into that issue, but finally that would increase overhead and maintenance work as those statements are getting more complex.

When using route_to_stream(), messages are assigned to the new streams after the pipeline has been completed and will then run through the pipelines associated with these new streams.

In your case, it would make sense to split your processing logic into multiple pipelines.

1 Like

Jochen, thanks for your reply.
But I have already multiple pipelines. Here in my example I’m having two pipelines configured.

Pipeline Security (stage 0), attached to stream ‘All Messages’, looking for messages where source = MyDummyHostName, defining some default fields and moving the messages to new ‘security’ stream.

Pipeline Syslog (stage 1), attaches to stream ‘All Messages’, looking for messages having fields facility and level defined. Setting as well some default values and moving these messages to new ‘syslog’ stream.

So, two pipelines, working on same stream.
But the issue is that both pipelines are working on same messages.

The first pipeline is reading a message looking like (note that field hostname is missing!):
facility: user-level
level: 6
timestamp: 2018-03-20T12:53:38.000Z
source: MyDummyHostName

That pipeline updated the message to be:
facility: user-level
level: 6
hostname: MyDummyHostName
source: syslog
timestamp: 2018-03-20T12:56:51.000Z
And moved that message to new stream ‘security’ - so far so good.

But now the second pipeline ‘syslog’, running on stage 1, reading messages from ‘All Messages’ is picking up the same message.
The result is this message:
facility: user-level
hostname: syslog
level: 6
message:
pipeline_rule: Syslog-Default-new
source: syslog
timestamp: 2018-03-20T12:59:40.000Z

So, I have different pipelines which are moving messages into different streams.
So, I have different pipelines which are moving messages into different streams.
I would now have expected that only one pipeline, where the match criteria apply, would process the messages in ‘All Messages’, since it has already moved these messages.
Unfortunately, this is not the case.
Therefore the pipeline ‘Syslog’ additionally contains the criterion ‘&& contains(to_string($message.source), “MyDummyHostName”) == false’
And from my understanding that shouldn’t be necessary because the message should already be moved to a new stream.

Maybe we’re mixing up terminology here. A pipeline has one or more stages and can be assigned to a stream.

Processing in a single pipeline starts in the first stage and continues through the stages if all or one of the rules in the previous stage were successfully completed (i. e. if the condition in the when clause was true).

Think of stages as groups of conditions and actions which need to run in order. All stages with the same priority run at the same time across all connected pipelines.

Quote from http://docs.graylog.org/en/2.4/pages/pipelines/pipelines.html

Since the pipelines run in parallel, the mutations of a message in one pipeline assigned to a specific stream aren’t necessarily available in another pipeline assigned to the same stream in the same stage.

So in your case, I’d create a pipeline attached to “All messages” which only sorts the messages into the correct streams and then process the messages (add/remove message fields) in pipelines attached to the respective streams.

You may be right, but I still have doubts.
The pipelines I’m talking about only have one stage at a time.

There is also only one pipeline with stage 0.
Therefore, this pipeline is completely terminated when the rule of stage 0 has been processed.
And yet another pipeline (Security, Stage 1) processes the same message…

Yes, because the updated stream information is applied after all pipelines assigned to the current stream have been completed.

If you’re interested in the internals, you can take a look at the code of the processing pipeline:

1 Like

And that fact was the missing item :slight_smile:

Thanks for clarification.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.