Pipeline parsing logs

Hi,

I’ve logs with this nomenclature :

hostname field1=value field2=01 field3=“value” etc.

For events from the same source, the order and the number of fields vary: several dozen layouts. It’s therefore not possible to do this with the extractors (the combinations (grok | grok) do not return all the fields in all cases, and this becomes unmanageable).

As the events all have the same beginning (hostname without k / v) then only k / v with the only variant: with or without quotes for the values, I told myself that it was possible to parse these events from a pipeline .

However, today I only replace prefixes, I don’t see how to perform this normalization action with the pipeline.

I wish to have :

  • For the first field: hostname: value
  • For the others: retrieve K and V as with an extractor but automatically for all the fields as we have the same syntax except quotes.

Can you help me ?

Thanks

EDIT:

I create a pipeline “A” with 1 rules:

rule "KV parsing"
when
    has_field("kv")
then
    set_fields(
                fields: 
                        key_value(
                            value: to_string($message.message), 
                            trim_value_chars: "\""
                            )
            );
end  

And I associate the pipeline to the stream which match log gl2_source_input.

I’ve logs in the stream, but nothing “enter” in the pipeline.

EDIT 2:

I’ve update the rule with:

rule "KV parsing"
when
    has_field("message")
then
    set_fields(
                fields: 
                        key_value(
                            value: to_string($message.message), 
                            trim_value_chars: "\""
                            )
            );
end  

But I’ve “Throughput = 0 msg/s”. I’ve my stream that receive event connected to the pipeline. The pipeline have this rules and this rules in the simulator works.

I don’t see what I forget.

On the simulator with raw string copy/paste to message field of an event, it’s OK, i’ve only:
Mutated fields

timestamp
    2020-05-15T21:51:30.132Z
    1589550878 

Is it normal ?

Hi,

I change the processing order:

|1|AWS Instance Name Lookup|active|
|2|Message Filter Chain|active|
|3|Pipeline Processor|active|
|4|GeoIP Resolver|active|

Now, I see “Throughput: 288 msg/s”

But I do not see any new logs in stream with the parsing fields results :frowning:
I’ve lost message in the stream :frowning:

I roll back

he @celine

you would need to route the messages with the processing pipeline into the streams.

Create a last stage where you place with the route_to_stream functions the messages into the streams where they should go.

Hi @jan

So I do this ?:

rule "KV parsing"
when
    has_field("message")
then
    set_fields(
                fields: 
                        key_value(
                            value: to_string($message.message), 
                            trim_value_chars: "\""
                            )
            );
    route_to_stream(id:“STREAM ID”, remove_from_default:true);  
end  

Or I do create a new stage dedicated to route ? If yes, what should i put in the when condition? Just check that one of the fields from the previous internship is present?

Thanks for your time

he @celine

you can do it exactly this way.

But you could also create a new stage and decide based on now given fields where this should be routed to.

So you could have a messages from application A that should go in stream_a and you have messages from application B that should go in stream_b and the when rule would be similar to:

when
   has_field("application") AND
   to_string($message.application) == "A"
then
  route_to_stream(id: "stream id for stream_a", remove_from_default:true);
end

this way you are super flexibel in the design of your streams.

hi @jan

I don’t understand where is my error. I explain all of this flow :

  • I’ve an input with ID “A_INPUT”, a stream with id “A_STREAM”.
  • In my stream, I’ve configured a rule with “all logs incoming by ‘A_INPUT’ are redirected to this stream”.
  • I show the “A_INPUT” logs go to this stream.
  • I create a pipeline with ID “A_PIPELINE”. In this pipeline, I’ve a stage 0.
  • I connect this pipeline to the “A_STREAM”.
  • I create a rule with ID “A_RULE”.

This rule contains this code :

rule "A_RULE"
when
    has_field("message")
then
    set_fields(
                fields: 
                        key_value(
                            value: to_string($message.message), 
                            trim_value_chars: "\""
                            )
            );
    route_to_stream(id:“STREAM ID”, remove_from_default:true);  
end
  • I associate the “A_RULE” to the “Stage 0” of “A_PIPELINE”.

I’ve this processing order :

|1|AWS Instance Name Lookup|active|
|2|Pipeline Processor|active|
|3|Message Filter Chain|active|
|4|GeoIP Resolver|active|

I show Current throughput 0 msg/s on the pipeline details and on the stage 0 details. But I show Current throughput 0 msg/s on the KV parsing details.

I think that Message Filter Chain did :

  • Receive inputs logs
  • Parse basics informations and add standards fields (timestamp, source, level, facility and message fields)
  • Apply streams rules

So I change the processing order to :

|1|AWS Instance Name Lookup|active|
|2|Message Filter Chain|active|
|3|Pipeline Processor|active|
|4|GeoIP Resolver|active|

I show Current throughput N msg/s on the pipeline details, on the stage 0 details and on the KV parsing details.

It looks good. I go to the search stream “A_STREAM” page. Indeed with the route_to_stream functions, I should see the parsed logs into this stream.

But since the submission of the cited configuration and despite the events seen in throughput of the pipeline, stage 0 and the rule, I’ve no logs in the “A_STREAM” stream.

Thanks to you

from the given information it should work.

With all provided information I can only see - and say yes, it looks all correct and I do not see any error.

You might want to add debug messages to your processing pipeline rule to verify that this is working. Did you see the messages when you do not search within the stream? Means when you search as admin on all messages in the system?

Hi @jan

This pipeline problems were really weird.

After all graylog cluster restart, the pipeline parsing and route_to_stream function works well. I will continue to investigate the subject.

Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.