I know right on the 3.3.2 - I’ve got to upgrade at some point but it’s unfortunately become such a critical part of our business logic that it needs careful planning and execution - I can do the latter, suck at the former.
debug()
isn’t an option, we’re pushing well over 2k messages per second through that pipeline normally, and spread across 3 nodes it’s an absolute pain in the yannowhat to work with Anyway, I put the pipeline rules for the potentially troubled pipeline down below; come to think of it after seeing them again there are a few potential problem points… but for completeness sake, here they are:
Pipeline stage 0; 1 rule:
rule "check if not json"
when
# these fields only appear when the JSON extractor on the input worked
has_field("short_message") == false && has_field("full_message") == false
then
end
Stage 1: 2 rules, match any
This rule is the one that can blow up because it’s not anchored fully; it’s a hold-over from some legacy log solutions that added a JSON encoded block on the end of a plaintext line
rule "extract data from message (GROK_META)"
when
regex("^\\{.*\\}$", to_string($message.message)).matches == false &&
regex(".*\\{.*?\\}$", to_string($message.message)).matches == true
then
let fields = grok("^\\[%{WORD:xxx_kernel_id}\\]\\[%{WORD:xxx_yyy_id}\\]\\[%{WORD:xxx_log_level}\\]\\[%{XXXTIMESTAMP:xxx_timestamp}\\]\\[%{XXXJAVACLASS:xxx_java_class}\\] %{GREEDYDATA:xxx_message} %{JSON:xxx_metadata_raw}$", to_string($message.message), true);
set_fields(fields);
set_field("message", to_string($message.xxx_message));
remove_field("xxx_message");
set_field("xxx_parsed_as", "GROK_META");
end
Patterns in this rule:
XXXJAVACLASS = .?
XXXTIMESTAMP = %{MONTHDAY}/%{MONTHNUM}/%{YEAR}_%{HOUR}:%{MINUTE}:%{SECOND}:%{NUMBER}
JSON = {.?}
And this rule which basically duplicates the above but should only trigger when that encoded JSON block isn’t present (I realise it can be done with a single rule since it’d just add a blank field for the missing block, but legacy stuff requires it to be filled when present, and to have the xxx_parsed_as field set to an appropriate value, and rules have no flow control so… this was the solution.
rule "extract data from message (GROK_NO_META)"
when
regex("^\\{.*\\}$", to_string($message.message)).matches == false &&
regex(".*\\{.*?\\}$", to_string($message.message)).matches == false
then
let fields = grok("\\[%{WORD:xxx_kernel_id}\\]\\[%{WORD:xxx_yyy_id}\\]\\[%{WORD:xxx_log_level}\\]\\[%{XXXTIMESTAMP:xxx_timestamp}\\]\\[%{XXXJAVACLASS:xxx_java_class}\\] %{GREEDYDATA:xxx_message}", to_string($message.message), true);
set_field("xxx_parsed_as", "GROK_NO_META");
# rest identical to rule above
end
Stage 2: 10 rules, match any
rule "fix warn/warning log level"
when
has_field("xxx_log_level") &&
contains(to_string($message.xxx_log_level), "WARNING")
then
set_field("xxx_log_level", "WARN");
end
rule "convert timestamp"
when
has_field("xxx_timestamp")
then
let ts = parse_date(value: to_string($message.xxx_timestamp), pattern: "dd/MM/YYYY_HH:mm:ss:SSS");
set_field("timestamp", ts);
remove_field("xxx_timestamp");
end
rule "check for action: XXX"
when
has_field("message") &&
contains(to_string($message.message), "WS_ReactiveHavetoRedactThis", true) &&
contains(to_string($message.message), "REST") &&
regex("(PUT|POST)", to_string($message.message)).matches == true &&
regex("XXX", to_string($message.message)).matches == true
then
set_field("is_action", true);
set_field("action_type", "XXX");
end
The above rule is duplicated twice, just different values for the XXX part.
rule "check sensor action"
when
has_field("message") && contains(to_string($message.message), "sensor action", true) && contains(to_string($message.message), "EVENT", true)
then
set_field("sensor_action", true);
end
rule "check for agenda switch"
when
has_field("message") && contains(to_string($message.message), "switching agenda", true)
then
set_field("agenda_switch", true);
end
rule "lookup brand"
when
has_field("xxx_yyy_id")
then
let result = lookup_value("xxx-yyy-brand", to_string($message.xxx_yyy_id), "redacteddefault");
set_field("xxx_brand", result);
end
And the last one:
rule "parse legacy json block"
when
has_field("xxx_metadata_raw")
then
let json = parse_json(to_string($message.xxx_metadata_raw));
let json_fields = select_jsonpath(json, {
x_logged_user: "$.X-Logged-User",
x_client_id: "$.X-Client-Id",
x_client_version: "$.X-Client-Version",
x_brand_id: "$.X-Brand-Id",
x_application_id: "$.X-Application-Id"
});
set_fields(json_fields);
end
This is the only pipeline that uses grok patterns, and generally catches about 40% of our total traffic.