Pipeline chaining and streams

I’m attempting to chain pipelines by using different streams as progress indicators but it’s not working as expected.

From what I can tell, pipeline stages are run in parallel. All stage 0, then all stage 1, etc. If I have a pipeline that sets fields that are required by another pipeline then I need to make sure the other pipeline stages are higher numbers.

What I’m attempting to do is have all events land into the default stream. From there, if the event requires cleanup due to something like improper syslog format, a pipeline sends that event to a cleanup stream and removes it from default. Once the cleanup pipeline is finished then the event is moved to the processing stream and removed from the cleanup stream. If the event is valid, it’s sent directly to the processing stream.

The processing stream has multiple pipelines which each have a field equals rule as the first stage. Once each pipeline extracts the structured data from the message field, the event is removed from the processing stream and sent to the appropriate stream for final storage. Currently, all streams are using the default index but eventually the final streams will each have their own index.

It seems like the chaining is not working correctly, however, as every pipeline seems to run against every event and therefore the disk journal and process buffer start filling up. Even if I stop the input, the journal and buffer never empty.

How can I efficiently route these streams and chain the pipelines?

Hello @CJRoss,

This sounds quite complicated, can the logs be sorted into whichever stream they belong to first and then only have the appropriate rules applied?

Are you utilising much regex matching within your rules?

I think I figured out what happened. Due to the fact that I was using the default stream as an entry point and the way I was routing to streams it was causing events to get dropped into the default stream and creating an infinite loop.

Since stream rules are only evaluated on index I created a new stream and rule to move everything there. This way if someone gets dropped into the default stream it doesn’t automatically get pushed through the pipeline again.

I don’t have any regex in the stream rules but my pipelines make heavy use of grok patterns. Once I fixed the infinite loop issue processing load has returned to it’s previous low levels.

I agree that this is a somewhat complicated setup but I needed a way to be able to easily tell when an event didn’t get correctly parsed. Also, this seems to avoid the problem I had previously where events weren’t getting processed due to the pipeline stage race condition. Now I know that if an event makes it into a stream that it’s already been processed for such things as improper syslog formatting, etc.

All that said, I’m always looking for ways to better optimize and profile my workflow. It’s gone through several versions already as I learn more about how Graylog works.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.