Pipeline rule to extract json not working

Dear all, I am trying to get a rule to extract the fields of a json message. This is the message:

{"ClientAddr":"109.250.999.999:51659","ClientHost":"109.250.999.999","ClientPort":"51999","ClientUsername":"-","DownstreamStatus":304,"Duration":42174808,"OriginDuration":42120121,"OriginStatus":304,"RequestAddr":"home.xxx-xxx.de","RequestContentSize":0,"RequestCount":18661,"RequestHost":"home.xxx-xxx.de","RequestMethod":"GET","RequestPath":"/ocs/v2.php/apps/notifications/api/v2/notifications","RequestPort":"-","RequestProtocol":"HTTP/2.0","RequestScheme":"https","RetryAttempts":0,"RouterName":"nextcloud-sec@docker","ServiceAddr":"172.19.0.999:999","ServiceName":"xxx-xxx-xxxde@docker","StartLocal":"2022-06-09T11:46:09.752096696Z","StartUTC":"2022-06-09T11:46:09.752096696Z","TLSCipher":"TLS_AES_128_GCM_SHA256","TLSVersion":"1.3","entryPointName":"https","level":"info","msg":"","time":"2022-06-09T11:46:09Z"}

And this is my rule:

rule "extract-json"
when
  starts_with(to_string($message.message), "{") && ends_with(to_string($message.message), "}")
then
  set_field("debug","true");
  let json = parse_json(to_string($message.message));
  let map = to_map(json);
  set_fields(map);
end

The debug field works.
Thank you already for the time you might spend.
Best regards,
Andreas

Hello,

Could you give us some more information, what fields did you want to create? perhaps along with versions your using i.e. Graylog. Elasticsearch, etc…
Just not sure what your trying to do.

Hi gsmith, thank you for following up on my question. My setup is Docker-based with:

  • Graylog 4.3.1
  • OpenSearch 1.3.2. (recently switched from Elasticsearch)
  • MongoDB 4.4

I would like to monitor the log files of my gitlab, nextcloud, traefik applications, all running docker-based with focus on accesslogs.
As a first step to solve my problem I reduced the jason message fields of the traefik accesslog, now the jason extraction for my Beats Input works, the corresponding pipeline rule still does not process any message.

{"ClientHost":"188.74.28.999","ClientPort":"53502","ClientUsername":"-","OriginStatus":200,"RequestAddr":"home.xxxxxxx.xx","RequestCount":37651,"RequestHost":"home.xxxxxxx.xx","RetryAttempts":0,"RouterName":"nextcloud-sec@docker","ServiceAddr":".xxxxxxx.xx:80","ServiceName":"nextcloud-.xxxxxxx.xx@docker","TLSCipher":"TLS_AES_128_GCM_SHA256","TLSVersion":"1.3","level":"info","msg":"","time":"2022-06-10T10:57:49Z"}

And I would like to make use of the pipeline processoring rules as I can use them per Stream and it seems that I am more flexible in order to extract relevant information. In my present stage this is the client IP adress, the user name, retry attempts and service name. Currently I try to better understand how to use regex in pipeline rules. May be this is a better approach?

I am thankful for every comment, guidance or hint to other sources of helpful information.
Andreas

Hello,

What are you using to ship these logs to Graylog?

To give you an example I have Graylog/Elasticsearch/MongoDb Docker using docker-compose on a Ubuntu server. With Garylog’s Sidecar install I’m shipping logs with FileBeat to Beat INPUT on Graylog using port 5044. On the Web UI, I configured FileBeat which the Sidecar controls as shown below.

# Needed for Graylog
fields_under_root: true
fields.collector_node_id: ${sidecar.nodeName}
fields.gl2_source_collector: ${sidecar.nodeId}

filebeat.inputs:
- input_type: log
  paths:
    - /var/log/nginx/error.log
    - /var/log/access.log
    - /var/log/nextcloud/nextcloud.log

  type: log
output.logstash:
   hosts: ["8.8.8.8:5044"]
path:
  data: /var/lib/graylog-sidecar/collectors/filebeat/data
  logs: /var/lib/graylog-sidecar/collectors/filebeat/log

By using this setup I I do not need to create extra pipelines or extractors.
If the logs are in different formats it would be advisable to create a different INPUT for those messages. This would make it easier to control and configure.

Example:
Syslogs messages go into INPUT Syslog UDP/TCP or GELF UDP/TCP

Maybe the messages that are from traefik applications are not, So I would use INPUT like Raw Plaintext. and just create an extractor/pipeline for that INPUT.

@maratoni

To help you further I did a mock-up on the log messages you posted, I’m not that good at pipeline as @tmacgbay is but I’m decent at extractors /w GROK & REGEX.

Example:
I copied your message posted above and sent it to my Graylog server ( Docker).
From there testing/Creating a REGEX extractor.

Results:

Also take note, I’m using Nxlog /w Syslog UDP input.

Hello and thank you for your detailed reply.

What are you using to ship these logs to Graylog?

I set up 3 inputs so far:

  • Beats for filebeat
  • GELF for the Docker Containers
  • Syslog for the syslog logs

I will think about your advise using more and specific inputs and using extractors. Thank you for your example.

In the meantime I tried to get the pipeline rule approach running for my above posted example. As I am in an early phase of my learning curve that could be helpful for others:

I started to get my regex working used this regex tester regex tester.

(?<=ClientUsername":")([^\"]*)

Problem is that Graylog uses JavaScript based regex. So this approach with “LookBehind” does not work.
Then I found this regex webpage https://www.regextester.com/109539 resulting in a working javascript regex:

(ClientUsername\".\")([^\"]+)

I had to learn that the regex tester did work also without some of the escape signs, but Graylog did not. So I had to try and error to come up with the above regex.
Then I bumped into the problem how to put the regex in the correct way into that rule. My learning is: start with something easy (=easy regex) that you know it´s doing what you want it to do.

So this is the result that works for me for the moment:

rule "extract-json"
when
  starts_with(to_string($message.message), "{") && ends_with(to_string($message.message), "}")
then
  let temp = regex("(ClientUsername\".\")([^\"]+)",to_string($message.message));
  set_field ("json_clientusername", temp["1"]);
end

I am open to any further hint and improvements. And as I said, I will think about my overall strategy how to get the log information properly processed.

Hello @maratoni

Thx for showing the resolution…

Just an idea you can always check by adding the debug in pipeline.

debug(temp);

I tested this in the lab
Using ClientPort

rule "client"
when
  starts_with(to_string($message.message), "{") && ends_with(to_string($message.message), "}")
then
  let temp = regex("(ClientPort\".\")([^\"]+)",to_string($message.message));
  set_field ("clientport", temp["1"]);
  debug(temp);
end
2022-06-13 18:12:48,996 INFO : org.graylog.plugins.pipelineprocessor.ast.functions.Function - PIPELINE DEBUG: {0=ClientPort":", 1=53502}
2022-06-13 18:12:48,996 INFO : org.graylog.plugins.pipelineprocessor.ast.functions.Function - PIPELINE DEBUG: {0=ClientPort":", 1=53502}
2022-06-13 18:12:48,996 INFO : org.graylog.plugins.pipelineprocessor.ast.functions.Function - PIPELINE DEBUG: {0=ClientPort":", 1=53502}
2022-06-13 18:12:48,996 INFO : org.graylog.plugins.pipelineprocessor.ast.functions.Function - PIPELINE DEBUG: {0=ClientPort":", 1=53502}
2022-06-13 18:12:48,996 INFO : org.graylog.plugins.pipelineprocessor.ast.functions.Function - PIPELINE DEBUG: {0=ClientPort":", 1=53502}

It seams to work :+1:

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.