Message clone plus route_to_stream in pipeline processor

Dears,

I’m trying to setup a separate stream with less information for a specific technical api user.

rule “clone_message_to_reroute_stream_C2S_C2Svp”
when
has_field(“service”) &&
contains(to_string($message.service), “C2S_C2Svp”)
then
let x = clone_message();
route_to_stream(id: “603fa731e859696e66878829”, message: x);
end

As you can see, the rule is pretty simple and applied in last stage.

The stream has a custom index and is started, but doesn’t has any rules applied.

The stream rule doesn’t show an throughput but also no errors. I even tried it with “when true” and nothing has been written to the new stream.

Am I missing something?

Hello @riskersen,

Since you’re not also specifying to remove the cloned message from the default stream, have you checked for the duplicate message there? Is it successfully cloning?

With route to stream, it should be only in the destination stream no? The original message is left intact in the correct stream.

I will check tomrrow in default stream, but if the messages would be there, shouldnt I see some MSG/s in throughput?

Thanks anyways!

Unless you’re specifically suppressing the duplication to the default stream the cloned message will appear there too.

https://docs.graylog.org/en/4.0/pages/pipelines/functions.html#route-to-stream

I added remove_from_default to the rule and also had a look on the All Messages Stream, but the messages weren’t there.

It looks like, that my when clause is somehow broken.

Best way to debug when clause is to use debug() function like this:
debug(concat("cloned message: ", to_string(x)))

And then check graylog log at /var/log/graylog-server
tail -f /var/log/graylog-server/server.log

https://docs.graylog.org/en/4.0/pages/pipelines/functions.html#debug

1 Like

Thanks.

I made some strange progress. I’ve changed the when clause to when true and also to a more specific field and with that, the CPU usage explodes and I wasn’t even able to fix the rule, because the web ui was unusable. No documents were written at all to elasticsearch. I’m not sure what graylog is doing during the CPU peak.

After changing the rule from contains(to_string($message.source"), “DEFRAFW09”) to something which won’t match in the mongodb, the performance is back to normal.

Just for the notes, if someone else is also interested:
db.pipeline_processor_rules.update( { “_id”:ObjectId(“603fa988e859696e66878ab8”) }, { $set: { source: “rule “clone_C2S_C2Svp_message”\nw
hen\n// has_field(“service”) &&\n// to_string($message.service) == “srv_S-C2S_C2Svp”\n has_field(“DEFRAFW09”)\nthen\n let x = clone_message(
);\n route_to_stream(id: “603fa731e859696e66878829”, message: x, remove_from_default: true);\nend” } } )

Throughput is currently around 4k msg/s on the attached stream. And around 1/4 would match the source pattern.

Is the clone_message + route_to_stream combination such a performance issue?

I found the issue. The pipeline rule was connected to a stage, which wasn’t reached by this specific message.

It seems, that I had a misunderstanding with the staging processor. I thought, that a message goes through all stages, even if it doesn’t match a previous stage.

The debug output helped a lot at this point.

2 Likes

Nice job fixing it, thanks for sharing the solution!

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.