Can not copy message to 2 indexes with different fiels per index

1. Describe your incident:
can not copy message to 2 indexes with different fiels per index

3. What steps have you already taken to try and solve the problem?
tried ordering stages differently,
tried applying Stream Tags in Stream view instead of in Pipeline
tried to put “remove” rule to different pipeline

4. How can the community help?
I created one pipeline connected to all messages stream using this rule:
STAGE 0
rule “pipeline with user”
when
true
then
let nocomma = to_string($message.message);
//GROK message & overwrite ingest timestamp with logfile timestamp
let groked_message = grok(pattern: “%{TIMESTAMP_ISO8601:logdate}\t ?%{DATA:loglevel}\t ?%{DATA:class}\t ?(%{DATA:user})?”, value: nocomma, only_named_captures: true);
set_fields(groked_message);
route_to_stream(id:“64463448d997114cd1cb3bbd”,remove_from_default:true);
end

STAGE 1
rule “remove User and store in other index”
when
true
then
remove_field(“user”);
route_to_stream(“5de0dc734bc67705e4eb022d”);
end

Each stream id has a different ES index assigned.
Problem is that the “remove_field()” command removes the user field from both Streams. How can I achieve that I end up with an Stream that contains the user-field and another Stream that does not contain the user-field?

when you route your message to another stream you are adding the ID to the “streams” field. The message stays the same, therefore you are removing your user from both messages.

What you need to do is to clone the message with all its fields, but without the user-field into another message.

If you don’t want to store the message (almost) twice in your database, you might consider decorators to hide/scramble the users.

Thank you @ihe .
Clone did the job.
For others going the same route my final solution looked similar to this:
STAGE 0
rule “fork messages”
when
true
then
let nocomma = to_string($message.message);
//GROK message & overwrite ingest timestamp with logfile timestamp
let groked_message = grok(pattern: “%{TIMESTAMP_ISO8601:logdate}\t ?%{DATA:loglevel}\t ?%{DATA:class}\t ?(%{DATA:user})?”, value: nocomma, only_named_captures: true);
set_fields(groked_message);
route_to_stream(id:“64463448d997114cd1cb3bbd”,remove_from_default:true);
let x = clone_message();
remove_field(“user”, x);
route_to_stream(id:“5de0dc734bc67705e4eb022d”, message:x);
end

this allows me to give some of my users a Stream with less fields. But of cause the space consumption nearly doubled.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.