Graylog streams data not indexing to elasticsearch via kafka

Hi All,

I am trying to send metricbeat data to beats input via graylog. Data is coming to streams and storing it in Elasticsearch index (metricbeat_0).

From the streams I am trying to send data to kafka via Manage Outputs in graylog and we have created customized kafka output plugin and below is the kafka config in manage outputs,

Graylog-Kafka ID: 5d247036c4566734032f9382
Type: org.graylog.plugins.kafka.KafkaOutput
  
TOPIC: metricbeat
ack: all
batch_size: 16384
broker_server: localhost:9092
buffer_memory: 33554432
linger_ms: 1
retries: 0

When I checked in the kafka topic metricbeat I can see the data,

ganeshbabur@localhost:/usr/local/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic metricbeat
{“windows_perfmon_processor_handle_count”:379,“gl2_remote_ip”:“157.49.222.215”,“@metadata_version”:“6.3.2”,“gl2_remote_port”:59537,“beat_hostname”:“localhost”,“source”:“localhost”,“message”:“-”,“beats_type”:“metricbeat”,“gl2_source_input”:“5d19bafac132874d3215704b”,“metricset_module”:“windows”,“@metadata_beat”:“metricbeat”,“metricset_name”:“perfmon”,“windows_perfmon_processor_handleCount”:“dwm”,“beat_name”:“localhost”,“@timestamp”:“2019-07-09T18:25:47.062Z”,“@metadata_type”:“doc”,“metricset_rtt”:21866,“beat_version”:“6.3.2”,“gl2_source_node”:“e543c0e3-b76c-4a7c-8e10-e8427d96dcf8”,“_id”:“f9016693-a276-11e9-ac0b-0a580af40001”,“host_name”:“localhost”,“timestamp”:2019-07-09T18:25:47.062Z}
{“gl2_remote_ip”:“157.49.222.215”,“@metadata_version”:“6.3.2”,“gl2_remote_port”:59537,“beat_hostname”:“localhost”,“source”:“localhost”,“message”:“-”,“beats_type”:“metricbeat”,“gl2_source_input”:“5d19bafac132874d3215704b”,“metricset_module”:“windows”,“windows_perfmon_processor_name”:“svchost”,“@metadata_beat”:“metricbeat”,“metricset_name”:“perfmon”,“beat_name”:“localhost”,“@timestamp”:“2019-07-09T18:25:47.062Z”,“@metadata_type”:“doc”,“metricset_rtt”:23852,“windows_perfmon_processor_workingset_bytes”:2.3740416E7,“beat_version”:“6.3.2”,“gl2_source_node”:“e543c0e3-b76c-4a7c-8e10-e8427d96dcf8”,“_id”:“f9025102-a276-11e9-ac0b-0a580af40001”,“host_name”:“localhost”,“timestamp”:2019-07-09T18:25:47.062Z}

Now I am using logstash to write data from the kafka topic to Elasticsearch index,
Below is the config I tried,

input{
  kafka {
     bootstrap_servers => "localhost:9092"
     topics => ["metricbeat"]
     auto_commit_interval_ms => "100"
     auto_offset_reset => "latest"
     codec => json
     decorate_events => true
    }
  }

output{
  elasticsearch{
     hosts => ["localhost:9200"]
     manage_template => false
     index => "testbeat"
    }
  stdout { codec => rubydebug }
}

I am printing the logstash output and below is the error i am getting,

[2019-07-09T18:37:38,596][ERROR][logstash.codecs.json] JSON parse error, original data now in message field {:error=>#<LogStash::Json::ParserError: Unexpected character ('-' (code 45)): was expecting comma to separate Object entries
 at [Source: (String)" {"windows_perfmon_processor_handle_count":366,"gl2_remote_ip":"203.90.4.250","@metadata_version":"6.3.2","gl2_remote_port":61473, "beat_hostname":"localhost","source":"localhost","message":"-","beats_type":"metricbeat", "gl2_source_input":"5d19bafac132874d3215704b","metricset_module":"windows","@metadata_beat":"metricbeat","windows_perfmon_processor_handleCount":"slack","metricset_name":"perfmon","beat_name":"localhost","@timestamp":"2019-07-09T13:15:58.389Z","@metadata_type":"doc","metri"[truncated 205 chars]; line: 1, column: 686]>, :data=>"{\"windows_perfmon_processor_handle_count\":366,\"gl2_remote_ip\":\"203.90.4.250\",\"@metadata_version\":\"6.3.2\",\"gl2_remote_port\":61473,\"beat_hostname\":\"localhost\",\"source\":\"localhost\",\"message\":\"-\",\"beats_type\":\"metricbeat\",\"gl2_source_input\":\"5d19bafac132874d3215704b\",\"metricset_module\":\"windows\",\"@metadata_beat\":\"metricbeat\",\"windows_perfmon_processor_handleCount\":\"slack\",\"metricset_name\":\"perfmon\",\"beat_name\":\"localhost\",\"@timestamp\":\"2019-07-09T13:15:58.389Z\",\"@metadata_type\":\"doc\",\"metricset_rtt\":11981,\"beat_version\":\"6.3.2\",\"gl2_source_node\":\"e543c0e3-b76c-4a7c-8e10-e8427d96dcf8\",\"_id\":\"b167d65c-a24b-11e9-ac0b-0a580af40001\",\"host_name\":\"localhost\",\"timestamp\":2019-07-09T13:15:58.389Z}"}

I can see in the data “message”: field has character “-” why does the logstash getting failed when the value has “-” and in elasticsearch testbeat index has the below document which is having jsonparsefailure,

{
        "_index": "testbeat",
        "_type": "doc",
        "_id": "ed4L2GsBLabQ6slboePR",
        "_score": 1,
        "_source": {
          "@timestamp": "2019-07-09T18:42:48.491Z",
          "message": """{"system_memory_actual_used_bytes":6538346496,"system_memory_actual_used_pct":0.7732,"system_memory_swap_total":24561954816,"gl2_remote_ip":"203.90.4.250","@metadata_version":"6.3.2","gl2_remote_port":61473,"system_memory_total":8455827456,"source":"localhost","beats_type":"metricbeat","gl2_source_input":"5d19bafac132874d3215704b","metricset_module":"system","@metadata_beat":"metricbeat","metricset_name":"memory","beat_name":"localhost","@metadata_type":"doc","system_memory_used_bytes":6538346496,"beat_version":"6.3.2","system_memory_used_pct":0.7732,"gl2_source_node":"e543c0e3-b76c-4a7c-8e10-e8427d96dcf8","system_memory_free":1917480960,"system_memory_swap_free":14430650368,"timestamp":2019-07-09T13:15:58.389Z,"beat_hostname":"localhost","message":"-","@timestamp":"2019-07-09T13:15:58.389Z","system_memory_swap_used_pct":0.4125,"system_memory_actual_free":1917480960,"_id":"b1682470-a24b-11e9-ac0b-0a580af40001","system_memory_swap_used_bytes":10131304448,"host_name":"localhost"}""",
          "tags": [
            "_jsonparsefailure"
          ],
          "@version": "1"
        }
      }

Please correct me if I am doing anything and let me know your thoughts to resolve this issue.

Thanks,
Ganeshbabu R

are you able to share your output plugin? without that it is not really easy to assist in any way

Hi @jan

Sorry for the delay… Please find the below Github link of Kafka Output plugin,

https://github.com/Microland/graylog-kafka-output

Also I have noticed the data which I am trying to send from Metricbeat stream to kafka topic is having the field “timestamp” and this field is generated by Graylog. The value of the field timestamp is not in double quotes and that’s the reason the JSON parsing error is happening in logstash.

Please find the below data from kafka topic.

ganeshbabur@localhost:/usr/local/kafka$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic metricbeat
{“windows_perfmon_processor_handle_count”:379,“gl2_remote_ip”:“157.49.222.215”,“@metadata_version”:“6.3.2”,“gl2_remote_port”:59537,“beat_hostname”:“localhost”,“source”:“localhost”,“message”:“-”,“beats_type”:“metricbeat”,“gl2_source_input”:“5d19bafac132874d3215704b”,“metricset_module”:“windows”,“@metadata_beat”:“metricbeat”,“metricset_name”:“perfmon”,“windows_perfmon_processor_handleCount”:“dwm”,“beat_name”:“localhost”,“@timestamp”:“2019-07-09T18:25:47.062Z”,“@metadata_type”:“doc”,“metricset_rtt”:21866,“beat_version”:“6.3.2”,“gl2_source_node”:“e543c0e3-b76c-4a7c-8e10-e8427d96dcf8”,“_id”:“f9016693-a276-11e9-ac0b-0a580af40001”,“host_name”:“localhost”,“timestamp”:2019-07-09T18:25:47.062Z}

Can you please share your thoughts why is it happening like this?

Thanks,
Ganeshbabu R

Hi @jan

I have shared the kafka output plugin… Did you get a chance to check this error?
the message which is sending to kafka topic is still having timestamp without double quotes due to that JSON parse error is happening in logstash while indexing to elasticsearch.

I have tried to remove the timestamp from logstash filter but still i am getting the same error

If you could share your thoughts on this it would be very helpful. Please help us how to debug further

Thanks,
Ganeshbabu R

nope - sorry it is not really realistic that I’m going to debug this plugin in my spare time.

@jan Yes we have understood !!!

As I have mentioned above we have tried using grok inside logstash filter to remove the “timestamp” from the message and now filter is working fine without any errors.

Also we have tried changing the kafka output plugin code to remove the timestamp from the message before sending it to topic and its worked as well…

Please let me know your thoughts.

Thanks,
Ganeshbabu R

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.