Need help on lookup table JSON


(Blason) #1

Hi Team,

I need a help with Lookup table on JSON PATH. I have setup separate elasticsearch server which is ingesting data from CSV file like this.

domains,attack,severity
mountchart.com,APT,medium
mountcrypt.top,PHISHING,High
mountexercise.com,RANSOMWARE,High
mountfacenepal.com,APT,medium
mountintegrity.com,CVE123-234,low
mountlanka.net,CRYPTO,low
mountliterark.in,MALSPAM,medium
mountmoney.top,Emotet,Low

Data is properly being fed in my elasticsearch server and wanted to use this data as a Lookup table on graylog server hence configured Data Adapter.

I am confused on single value Json path and multi value. What should be used there? so that " “mountmoney.top” domain is matched from my BIND logs new fields will be added as Attack name i.e. Emotet and Severity as Low.

Here is my JSON value from elasticsearch

{
   "_index":"logstash-doms-2018.09.22",
   "_type":"doc",
   "_id":"jygoAmYBCOgodINNHae0",
   "_version":1,
   "_score":null,
   "_source":{
      "domains":"mountvalley.org",
      "attack":"DGA",
      "message":"mountvalley.org,DGA,High",
      "@timestamp":"2018-09-22T16:41:12.820Z",
      "@version":"1",
      "severity":"High",
      "host":"dsv.iqn.in",
      "path":"/var/log/dsv.csv"
   },
   "fields":{
      "@timestamp":[
         "2018-09-22T16:41:12.820Z"
      ]
   },
   "sort":[
      1537634472820
   ]
}

Can someone please suggest?

TIA
Blason R


(Blason) #2

Here is I tried from my end but still I am receiving as NULL value

Lookup URL

http://192.168.5.18:9200/logstash-*/_search?q=${key}

Single value JSONPath

$._index.domains

Multi value JSONPath

$._index[*]

HTTP User-Agent

Graylog Lookup - https://www.graylog.org/


(Blason) #3

Hi Team,

Any clue here pls? I am running out of ideas and would really appreciate if someone can offer help!!

TIA


(Philipp Ruland) #4

Heyo @blason,

your JSONPath is looking under the wrong node. Your wanted value is under $._source.domains, but you are looking at $._index.domains which is pointing into null since _index does not have any childs :slight_smile:

Single and multi value do what their name suggests. Single value expects to to provide a JSONPath that points to a single field in the JSON that will be returned when doing a lookup. The Multi-Value Path expects any number of fields or even objects to be returned by the JSONPath. This is basically to cover multiple use cases. E.g. in one processing pipeline you only need the attack field to route the messages and in another you need all three values to enrich the message.

Greetings,
Philipp


(Blason) #5

Hi @derPhlipsi

Thanks for the reply. I also did that adding _source.domains but still it returns the null value. Is anything else can be checked up?


(Philipp Ruland) #6

Well, using your provided data, this JSONPath works:
$._source.domains

And your multi-value should be this.
$._source


(Blason) #7

Thanks man. let me spin up my instance but I need to achieve that as a Lookup table so that _source.domains is found it should add field as mentioned in attack.

Thanks again.


(Philipp Ruland) #8

Little waypoint you should visit to get that:
http://docs.graylog.org/en/2.4/pages/lookuptables.html#pipeline-rules


(Blason) #9

Hi,

I was pretty sure this was not right as suggested changes are even returning null values. I guess may be my query URL is not valid?

Can you please help?

http://192.168.1.15:9200/logstash-doms-*/_search?q=${key}


(Philipp Ruland) #10

What is the response when doing the URL as curl? E.g.:

curl -X GET "http://192.168.1.15:9200/logstash-doms-*/_search?q=0009900025.000webhostapp.com"

I think I typed the hostname correctly. I get a phishing warning by my browser at least :smiley:

Greetings,
Philipp

PS: Remember to pretty the output with the ?pretty=true parameter in the URL above and using tripple backticks around it to have a better format in the forum :slight_smile:


(Blason) #11

How do I combine pretty with q using curl? Is that possible? I am not aware though.

However Kibana page returns proper message


(Philipp Ruland) #12

Simply append the parameter like this: (drop the ? since you already have it and substitute it with a & because you are chaining multiple params)

curl -X GET "http://192.168.1.15:9200/logstash-doms-*/_search?q=0009900025.000webhostapp.com&pretty=true"

But you also could copy the output from kibana. I think there is a copy function there somewhere, haven’t used it in year now :smiley:


(Blason) #13

Yep I was forgetting ".

Any ways here is the output

{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 1.9061548,
    "hits" : [
      {
        "_index" : "logstash-doms-2018.09.24",
        "_type" : "logs",
        "_id" : "AWYLQFFsFACyq4pzg8EE",
        "_score" : 1.9061548,
        "_source" : {
          "severity" : "low",
          "path" : "/var/log/dsv.csv",
          "logstash_time" : 0.002000093460083008,
          "@timestamp" : "2018-09-24T11:04:13.691Z",
          "attack" : "EMOTET",
          "@version" : "1",
          "host" : "0.0.0.0",
          "domains" : "0009900025.000webhostapp.com",
          "message" : "\"0009900025.000webhostapp.com\",\"EMOTET\",\"low\"",
          "tags" : [
            "_grokparsefailure",
            "_geoip_lookup_failure"
          ]
        }
      },
      {
        "_index" : "logstash-doms-2018.09.24",
        "_type" : "logs",
        "_id" : "AWYLObjdFACyq4pzg8Db",
        "_score" : 1.6554626,
        "_source" : {
          "severity" : "low",
          "path" : "/var/log/dsv.csv",
          "logstash_time" : 0.002000093460083008,
          "@timestamp" : "2018-09-24T10:57:02.129Z",
          "attack" : "EMOTET",
          "@version" : "1",
          "host" : "0.0.0.0",
          "domains" : "0009900025.000webhostapp.com",
          "message" : "\"0009900025.000webhostapp.com\",\"EMOTET\",\"low\"",
          "tags" : [
            "_grokparsefailure",
            "_geoip_lookup_failure"
          ]
        }
      },
      {
        "_index" : "logstash-doms-2018.09.24",
        "_type" : "logs",
        "_id" : "AWYLN0OsFACyq4pzg8DF",
        "_score" : 1.6554626,
        "_source" : {
          "severity" : "low",
          "path" : "/var/log/dsv.csv",
          "logstash_time" : 0.003000020980834961,
          "@timestamp" : "2018-09-24T10:54:17.952Z",
          "attack" : "EMOTET",
          "@version" : "1",
          "host" : "0.0.0.0",
          "domains" : "0009900025.000webhostapp.com",
          "message" : "0009900025.000webhostapp.com,EMOTET,low",
          "tags" : [
            "_grokparsefailure",
            "_geoip_lookup_failure"
          ]
        }
      }
    ]
  }
}

(Blason) #14

Is it because ${key} has special characters? lile “.”


(Philipp Ruland) #15

Well, derp. I forgot about the structure of a elasticsearch response, sorry :smiley:

This would be your single value path: $.hits.hits[*]._source.domains… but… that won’t work.
Graylog expects one value to be returned, but your dataset returns three, since you have three objects at the node $.hits.hits[*].

[
  "0009900025.000webhostapp.com",
  "0009900025.000webhostapp.com",
  "0009900025.000webhostapp.com"
]

Possible solutions:

1: Always use first match

$.hits.hits[0]._source.domains and $.hits.hits[0]._source respectively.
This one will always return the values of the first item elasticsearch returns.

2: Use max score from Elasticsearch

$.hits.hits[?(@._score == $.hits.max_score)]._source.domains and $.hits.hits[?(@._score == $.hits.max_score)]._source respectively.
This one will return the object that Elasticsearch thinks is the most matching element.

3: Use asterisks and limit elasticsearch to 1

$.hits.hits[*]._source.domains and $.hits.hits[*]._source respectively.
Use this one with the From/Size parameter in the Elasticsearch query. Simply put &size=1 at the end:

http://192.168.1.15:9200/logstash-doms-*/_search?q=${key}&size=1

If you wanna get really fancy: Have a look into the scoring functionality of Elasticsearch and set different scoring weights for the different values like “EMOTET”. Then use option number 2 to have Elasticsearch aid you with choosing the most prominent threat scenario :slight_smile:

Greetings,
Philipp

PS: Use this to evaluate your JSONPath, it’s backed by the actual implementation that Graylog uses :slight_smile:
http://jsonpath.herokuapp.com


(Blason) #16

Awesome buddy and thanks a ton :slight_smile:
That worked and my next attempt will be to use in extractor. Let me see if that gets succeeded.


(Blason) #17

Hey @derPhlipsi

Just a quick one. That was perfectly succeded and now how do I use multiple tagging using lookup tables? I mean at this moment if the ${key} is matched then attack-name is being displayed but I have Severity as well, can you confirm how do I achieve in same lookup table?


(Philipp Ruland) #18

I guess you mean you want to write the information that you query from your lookup table to the message itself, correct? :slight_smile:

That would be done in one of these ways:
NOTE: I changed the single-value-lookup to “$.hits.hits[0]._source.attack” for the pipeline rule to be more fluent. Also, it really doen’t make sense to lookup a value and get itself returned :smiley:

rule "addThreatInfo"

when
    contains("EMOTET,THREAT,SOMEOTHERVALUETOACTUPON", to_string(lookup_value("threatUrlLookup", $message.url))) //Use contains() to check against a list of values to act on.
then
    set_fields(fields: lookup("threatUrlLookup", $message.url));
end

However, I would use this approach:

rule "addThreatInfo"

when
    contains("EMOTET,THREAT,SOMEOTHERVALUETOACTUPON", to_string(lookup_value("threatUrlLookup", $message.url))) // Use contains() to check against a list of values to act on.
then
    let lookupResult = lookup("threatUrlLookup", $message.url); // Do Lookup.
    set_field("url_attack", lookupResult.attack); // Set every field individually to remove excess clutter from lookup.
    set_field("url_threat_severity", lookupResult.severity);
    set_field("url_threat_timestamp", lookupResult["@timestamp"]); // Use array accessor instead of dotted name to allow the @-symbol to be used.
end

NOTE: I do not know if you can use the lookupResult["name"] atm. I can’t find anything about it in the docs but the Graylog rule editor does not complain, so I guess it would work.

Step 1: Create this rule
Step 2: Create a pipeline and assign the rule.
Step 3: Profit. :slight_smile:

Greetings,
Philipp


(Blason) #19

Oh hang on. I mean to say I am getting a proper result if the domain match is found and attack field is added in the logs. However the same CSV file has severity associated hence wondering if is it possible to query and tag severity field as well in single lookup?

Well the attack field can be anything and can not be specified, however severity will be Critical, High, Medium and Low. So, I really doubt if filtering on attack name would work?

TIA


(Philipp Ruland) #20

Sure, simply edit the lookup adapter as well as the pipeline to use the severity field instead of the domains/attack field :slight_smile:

This really only depends on your definiton and needs. Graylog would (kind of) gracefully ignore non-existent values, but if your use case profits from using the severity then sure, use that :slight_smile:

Greetings,
Philipp