Using a CSV lookup table within a pipeline

Dear All,

I have a csv (UTF-8 encoded) file as shown below

"Command","Rank","Category"
"tasklist","1","Attacker Investigation"
"ver","2","Attacker Investigation"
"ipconfig","3","Attacker Investigation"
"net time","4","Attacker Investigation"
"systeminfo","5","Attacker Investigation"

I have set the data adapter key to be the Command field and the lookup to be the Rank,Category.

Now I have logs with a CommandLine parameter which shows the commands that have been run on my Windows system which are coming into Graylog via a GELF input.

I want to use a pipeline function similar too below to query the lookup table and add one / two fields in to the log if there is a match or not

hack_command_run = true / false for match no match
hack_command_lookup = lookup results

My function code is as follows.

// function to check  Commandline field in logs against a lookup table in a stream

rule "check_for_bad_command"
when
   // To save CPU cycles, only run on this field in stream
   has_field("CommandLine")
   then
	set_fields(hack_command_run,"false");
	let hack_command_lookup = lookup_value(to_string("Command", $message.CommandLine), " CommandLine");
	set_fields(hack_command_lookup);
	set_fields(hack_command_run,"true");
end

My expected add fields i want to achieve are

No match (tasklist)

hack_command_run=false

Match

hack_command_run=true
hack_command_lookup=1,Attack Investigation

Any helps appreciated

Jake

You’re missing double quotes around the field name.
http://docs.graylog.org/en/2.4/pages/pipelines/functions.html#set-field

What do you expect that function call to return?
http://docs.graylog.org/en/2.4/pages/pipelines/functions.html#to-string

The order of function parameters is relevant and I’m pretty sure yours is wrong.
http://docs.graylog.org/en/2.4/pages/pipelines/functions.html#lookup-value

Hi jochen,

I was debugging function and discovered a few errors. My code is now .

// function to check sysmon Commandline against a lookup table in a stream

rule "check_for_bad_command"
when
// To save CPU cycles, only run on this field in stream
has_field(“CommandLine”)
then
set_field(“hack_command_run”,“false”);
let hack_command_lookup = lookup(“Naughty_Commands”,“Command”, $message.CommandLine);
set_field(“hack_cmd_detected”, hack_command_lookup);
set_field(“hack_command_run”,“true”);
end

Ok mistakes in previous code

  1. set_fields should have been set_field with quotes
  2. let hack_command_lookup = lookup_value should have been let hack_command_lookup = lookup function which returns a multiple value according the summary in the rules editor.

I am expecting the lookup function to return two values, for example if the CommandLine field in the log had “tasklist” in it, the lookup should use the key value field Command in the CSV to return 1,Attack Investigation into the variable hack_command_lookup?

Question
Do I need to do the string conversion as the log looks to have a string in it?

Cheers

Jake

1 Like

No, not necessarily but it’s a safe-guard against unexpected types in the fields you’re referring to.

Hi Jochen,

I nearly have everything working except for one thing, they lookup key is returned not the value. I will explain

Logs are being sent in and have a conversation on field name so that sysmon_cmd_event holds the command run by a user for example ‘tasklist’

I have a pipeline rule that is the following.

// function to check sysmon_cmd_event against a lookup table in a stream

rule “check_for_uncommon_command”
when
// To save CPU cycles, only run on this field in stream

has_field(“sysmon_cmd_event”)

then

// set fields to default values
set_field(“UncommonCommand”,“false”);
set_field(“CommandCategory”, “None”);

// Perform Lookups and set fields

let hack_command_lookup = lookup("uncommon-commands","Command", $message.sysmon_cmd_event);
set_field("CommandCategory", hack_command_lookup);
set_field("UncommonCommand","true");

end

If I check the lookup table using the test lookup it appears fine

lookup-1

We get the correct value and I get an alert to slack which works.

However in the logs the CommandCategory has the lookup value., see below

The UncommonCommand" fielld is set correctly. I was expecting the “CommandCategory” field to be “Attacker Investigation” .

Have I made a mistake in the pipeline code? Why is the key value returned and not the ‘looked up’ value?

Cheers Jake

You should take another look at the parameters of the lookup() function and their correct order.

Jochen,

See mistake should be

let hack_command_lookup = lookup(“uncommon-commands”,“Command”,“None”, $message.sysmon_cmd_event);

This would lookup based on the key "Command " described in csv with a default message of “None” if no lookup is found.

So this would still return the command parameter so it must be changed to

let hack_command_lookup = lookup(“uncommon-commands”,“Category”,“None”, $message.sysmon_cmd_event);

Cheers Jake

Hi Jochen,

Got it figured out

// Perform Lookups and set fields

let hack_command_lookup = lookup("uncommon-commands",$message.sysmon_cmd_event,"None");
set_field("CommandCategory", hack_command_lookup);

Silly me!!!

lookup-correct

Jake

Hi Jochen,

Could I use the split function or similar to just leave “Attacker Investigation” rather than the key value pair?

Jake

You could use lookup_value() instead of lookup().

Hi Jochen,

When I use the lookup_value form in the following way, it get errors.

// Perform Lookups and set fields

let hack_command_lookup = lookup_value("uncommon-commands",$message.sysmon_cmd_event,"None");
set_field("CommandCategory", hack_command_lookup);

I get indexer and parser values.
{“type”:“mapper_parsing_exception”,“reason”:“object mapping for [CommandCategory] tried to parse field [CommandCategory] as object, but found a concrete value”}

Looking at the difference in the structure for the two functions we get

This indicates to me that they return, different object types , hence the error described as above.

Can you explain to me why, it is not working?

Jake

Try rotating indices (System/Indices/Index Set/Maintenance) and, if that works, create a custom index template which properly sets the type for the “CommandCategory” field.

Yes. lookup() returns a map (dictionary, object, whatever you’d like to call it) and lookup_value() returns a “flat” value.

Hi Jochen,

I rotated the indexes in (System/Indices/Index Set/Maintenance) using rotate active index option. Set the statement to use the lookup_value in the function and get the same errors on the new rotated index

a minute ago graylog_1 2b9ced70-f7b7-11e7-b971-06d71caf9804 {“type”:“mapper_parsing_exception”,“reason”:“object mapping for [CommandCategory] tried to parse field [CommandCategory] as object, but found a concrete value”}

Index is now graylog_1.

Looks like that does not work unless I have done something incorrectly.

Jake

First change the pipeline rule (or suspend it), then rotate the index. Otherwise, the new index will have the same type for the “CommandCategory” field (because your rule fills it accordingly).

Alternatively, use another (unused) field name.

Hi Jochen,

I also took a look at the indexes via the API after rotation using http://10.0.1.177:9000/api/system/indexer/overview and got the following JSON back

{
“deflector”: {
“current_target”: “graylog_1”,
“is_up”: true
},
“indexer_cluster”: {
“health”: {
“status”: “green”,
“shards”: {
“active”: 8,
“initializing”: 0,
“relocating”: 0,
“unassigned”: 0
}
},
“name”: “graylog”
},
“counts”: {
“events”: 17019
},
“indices”: {
“graylog_1”: {
“size”: {
“events”: 233,
“deleted”: 0,
“bytes”: 1116764
},
“range”: {
“index_name”: “graylog_1”,
“begin”: “1970-01-01T00:00:00.000Z”,
“end”: “1970-01-01T00:00:00.000Z”,
“calculated_at”: “2018-01-12T16:38:01.091Z”,
“took_ms”: 0
},
“is_deflector”: true,
“is_closed”: false,
“is_reopened”: false
},
“graylog_0”: {
“size”: {
“events”: 16786,
“deleted”: 0,
“bytes”: 26343100
},
“range”: {
“index_name”: “graylog_0”,
“begin”: “2017-11-30T22:49:07.000Z”,
“end”: “2018-01-12T16:37:53.000Z”,
“calculated_at”: “2018-01-12T16:38:31.282Z”,
“took_ms”: 49
},
“is_deflector”: false,
“is_closed”: false,
“is_reopened”: false
}
}
}

The graylog_1 index has a date time stamp of 1970 which is possibly a bug, do you want me to put it on Github?

Hi Jochen,

Is this relevant to the problem?

http://docs.graylog.org/en/2.4/pages/lookuptables.html

Lookupo Results Section of above

“Currently, the multi value can only be used in a pipeline rule when using the lookup() pipeline function.”

Jake

Hi Jochen,

Are we saying that the Command Category field has been mapped to the wrong field type in the default index?

Is that the reason that the changing the name may resolve it.

I will try changing the rule and then rotating the index and come back.

Jake

Hi Jochen,

Problem solved removing pipepline rule from stage, changing functions, rotating and re-enabling.

Cheers

Jake

That’s intentional since the index range (minimum and maximum timestamp) cannot be calculated for the currently write-active index. It’s basically a placeholder.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.