Pipelines are filling the process buffer and journal

hello people,

i have a new issue with graylog my pipelines are filling the process buffer and journal, i’m using another lookup table for matching a particular IP to a name, except these ones nothing else like extractors…

i have
4 Graylog Servers which are doing the message processing - 24 vCores, 64 gb of ram with 30 dedicated to the heap for java.
1 Graylog Master which is also the Webserver 15 vCores 32 gb of ram with 16 for java heap -
(All 5 Graylog Servers have MongoDb with one Primary)
3 Elastic Search Data Node Servers - 24 vCores, 64 gb of ram with 30 dedicated to the heap for java.
3 Elastic Search Master Node Servers - 10 vCores 32 gb of ram with 16 for java heap

the number of messages / second are around 15k
and the rules that i have are as follows:

rule "ASA firewall rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let asa1 = grok(pattern: "%{CISCO_APP:application} client %{DATA:src_interface}:%{IP:src_ip}(/%{INT:src_port})", value: message_field, only_named_captures: true);
	let asa2 = grok(pattern: "%{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} from %{IP:src_ip}/%{INT:src_port} to %{IP:dst_ip}/%{INT:dst_port} due to %{CISCO_REASON:reason}", value: message_field, only_named_captures: true);
	let asa3 = grok(pattern: "%{CISCO_ACTION:action} local-host %{DATA:dst_interface}:%{IP:dst_ip}", value: message_field, only_named_captures: true);
	let asa4 = grok(pattern: "%{WORD:protocol} (?:request|access) %{CISCO_ACTION:action} from %{IP:src_ip}/%{INT:src_port} to %{DATA:dst_interface}:%{IP:dst_ip}/%{INT:dst_port}", value: message_field, only_named_captures: true);
	let asa5 = grok(pattern: "%{CISCO_IPS:Detected_by_IPS} requested to %{CISCO_ACTION:action} %{WORD:protocol} packet from %{DATA:src_interface}:%{IP:src_ip}(/%{INT:src_port}) to %{DATA:dst_interface}:%{IP:dst_ip}(/%{INT:dst_port})", value: message_field, only_named_captures: true);
	let asa6 = grok(pattern: "%{CISCO_REASON:reason} from %{DATA:src_interface}:%{IP:src_ip}/%{INT:src_port} to %{DATA:dst_interface}:%{IP:dst_ip}/%{INT:dst_port} with different initial sequence number", value: message_field, only_named_captures: true);
	let asa7 = grok(pattern: "%{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} from %{IP:src_ip}/%{INT:src_port} to %{IP:dst_ip}/%{INT:dst_port} on interface %{DST_ACCT:dst_interface}", value: message_field, only_named_captures: true);
	let asa8 = grok(pattern: "%{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} connection %{INT:connection_id} for %{DATA:src_interface}:%{IP:src_ip}/%{INT:src_port} to %{DATA:dst_interface}:%{IP:dst_ip}/%{INT:dst_port} duration %{TIME:duration} bytes %{INT:bytes}", value: message_field, only_named_captures: true);
	let asa9 = grok(pattern: "%{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} connection %{INT:connection_id} for %{DATA:src_interface}:%{IP:src_ip}/%{INT:src_port}( %{DATA:UNWANTED}) to %{DATA:dst_interface}:%{IP:dst_ip}/%{INT:dst_port}( %{DATA:UNWANTED})", value: message_field, only_named_captures: true);
	let asa10 = grok(pattern: "%{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} connection for faddr %{IP:dst_ip}/%{INT:icmp_seq_num}(?:(%{DATA:user}))? gaddr %{IP:src_xlated_ip}/%{INT:icmp_code_xlated} laddr %{IP:src_ip}/%{INT:icmp_code}( (%{DATA:user}))?", value: message_field, only_named_captures: true);
	let asa11 = grok(pattern: "%{CISCO_ACTION:action}(?: %{CISCO_DIRECTION:direction})? %{WORD:protocol} connection %{INT:connection_id} for %{DATA:src_interface}:%{IP:src_ip}/%{INT:src_port}(?:(%{DATA:user}))? to %{DATA:dst_interface}:%{IP:dst_ip}/%{INT:dst_port} duration %{TIME:duration} bytes %{INT:bytes}( (%{DATA:user}))?", value: message_field, only_named_captures: true);
	let asa12 = grok(pattern: "%{CISCO_ACTION:action} %{WORD:protocol} src %{DATA:src_interface}:%{IP:src_ip}(/%{INT:src_port})?((%{DATA:user}))?dst %{DATA:dst_interface}:%{IP:dst_ip}(/%{INT:dst_port})?((%{DATA:user})) by access-group %{DATA:access_group}", value: message_field, only_named_captures: true);
	let asa13 = grok(pattern: "%{CISCO_ACTION:action} %{CISCO_XLATE_TYPE:xlate_type} %{WORD:protocol} translation from %{DATA:src_interface}:%{IP:src_ip}(/%{INT:src_port})?((%{DATA:user}))? to %{DATA:src_xlated_interface}:%{IP:src_xlated_ip}/%{INT:src_xlated_port}", value: message_field, only_named_captures: true);
	let asa14 = grok(pattern: "%{CISCO_ACTION:action} %{WORD:protocol} %{CISCO_REASON:reason} from %{IP:src_ip}/%{INT:src_port} to %{IP:dst_ip}/%{INT:dst_port} flags %{CISCO_FLAGS:tcp_flags}  on interface %{GREEDYDATA:interface}", value: message_field, only_named_captures: true);
	let asa15 = grok(pattern: "%{IP:dst_ip} Accessed URL %{GREEDYDATA:url}?", value: message_field, only_named_captures: true);
	let asa16 = grok(pattern: "Starting %{CISCO_SSL:ssl} with client %{DATA:dst_interface}:%{IP:dst_ip}(/%{INT:dst_port})", value: message_field, only_named_captures: true);
	let asa17 = grok(pattern: "%{CISCO_SSL:ssl} %{DATA:dst_interface}:%{IP:dst_ip}(/%{INT:dst_port})", value: message_field, only_named_captures: true);
	let asa18 = grok(pattern: "%{CISCO_SSL:ssl} with client %{DATA:dst_interface}:%{IP:dst_ip}(/%{INT:dst_port})", value: message_field, only_named_captures: true);
	let asa19 = grok(pattern: "%{CISCO_ACTION:action} %{WORD:protocol} from %{IP:src_ip} to %{IP:dst_ip}", value: message_field, only_named_captures: true);
	let asa20 = grok(pattern: "access-list %{DATA:acl_name} %{CISCO_ACTION:action} %{WORD:protocol} for user %{DATA:user} %{DATA:src_interface}/%{IP:src_ip}%{CISCO_PORT:src_port} -> %{DATA:dst_interface}/%{IP:dst_ip}%{CISCO_PORT:dst_port}", value: message_field, only_named_captures: true);
	 
	
	set_fields(asa1);
	set_fields(asa2);
	set_fields(asa3);
	set_fields(asa4);
	set_fields(asa5);
	set_fields(asa6);
	set_fields(asa7);
	set_fields(asa8);
	set_fields(asa9);
	set_fields(asa10);
	set_fields(asa11);
	set_fields(asa12);
	set_fields(asa13);
	set_fields(asa14);
	set_fields(asa15);
	set_fields(asa16);
	set_fields(asa17);
	set_fields(asa18);
	set_fields(asa19);
	set_fields(asa20);
	
end

rule "ASA_Lookup_Hostnames rule"
when

    has_field("source")
    
then

	let mutate = lookup_value("asa.look", $message.source);
	
	set_field("ASA_host", mutate);

end

rule "CyberArk rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let zscaler = grok(pattern: "act=%{DATA:act} suser=%{DATA:suser} fname=%{DATA:fname} dvc=%{DATA:dvc} shost=%{DATA:shost} dhost=%{DATA:dhost} duser=%{DATA:duser} externalId%{DATA:ext_ID}= app=%{DATA:app} reason=%{DATA:reason} cs1Label=%{DATA:cs1Label} cs1=%{DATA:cs1} cs2Label=%{DATA:cs2Label} cs2=%{DATA:cs2} cs3Label=%{DATA:cs3Label} cs3=%{DATA:cs3} cs4Label=%{DATA:cs4Label} cs4=%{DATA:cs4} cs5Label=%{DATA:cs5Label} cs5=%{DATA:cs5} cn1Label=%{DATA:cn1Label} cn1=%{DATA:cn1} cn2Label=%{DATA:cn2Label} cn2=%{DATA:cn2}  msg=%{GREEDYDATA:msg}", value: message_field, only_named_captures: true);

	set_fields(zscaler);

end


rule "firepower sensors rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let fps0 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{WORD:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, FileCount: %{DATA:host}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, HTTPResponse: %{NUMBER:http_response}, ReferencedHost: %{DATA:referenced_host}, URLCategory: %{DATA:url_category}, URLReputation: %{DATA:url_reputation}, URL: %{GREEDYDATA:url}", value: message_field, only_named_captures: true);
	let fps1 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{WORD:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, UserAgent: %{DATA:user_agent}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, FileCount: %{DATA:host}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, HTTPResponse: %{NUMBER:http_response}, ReferencedHost: %{DATA:referenced_host}, URLCategory: %{DATA:url_category}, URLReputation: %{DATA:url_reputation}, URL: %{GREEDYDATA:url}", value: message_field, only_named_captures: true);
	let fps2 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{WORD:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSRecordType: %{DATA:dns_record_type}, DNSResponseType: %{DATA:dns_response_type}, DNS_TTL: %{DATA:dns_ttl}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps3 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{WORD:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, SecIntMatchingIP: %{DATA:sec_int_match_IP}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps4 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{WORD:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps5 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{WORD:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{DATA:url_reputation}, URL: %{GREEDYDATA:url}", value: message_field, only_named_captures: true);
	let fps6 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, ICMPType: %{DATA:icmp_type}, ICMPCode: %{DATA:icmp_code}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps7 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{DATA:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{DATA:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps8 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, ICMPType: %{DATA:icmp_type}, ICMPCode: %{DATA:icmp_code}, TCPFlags: %{DATA:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{DATA:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps9 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, TCPFlags: %{DATA:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{DATA:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps10 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, SecIntMatchingIP: %{DATA:sec_int_match_IP}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps11 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{DATA:sinkhole}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps12 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, TCPFlags: %{DATA:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps13 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, ICMPType: %{DATA:icmp_type}, ICMPCode: %{DATA:icmp_code}, TCPFlags: %{DATA:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, SecIntMatchingIP: %{DATA:sec_int_match_IP}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps14 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, ICMPType: %{DATA:icmp_type}, ICMPCode: %{DATA:icmp_code}, TCPFlags: %{DATA:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, SecIntMatchingIP: %{DATA:sec_int_match_IP}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps15 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, ICMPType: %{DATA:icmp_type}, ICMPCode: %{DATA:icmp_code}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{WORD:ingress_intf}, IngressZone: %{WORD:ingress_zone}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps16 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSRery: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps17 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{GREEDYDATA:url_category}", value: message_field, only_named_captures: true);
	let fps18 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{GREEDYDATA:sinkhole}", value: message_field, only_named_captures: true);
	let fps19 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, UserAgent: %{DATA:user_agent}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLSICategory: %{DATA:url_security_category}, URLCategory: %{DATA:url_category}, URLReputation: %{DATA:url_reputation}, URL: %{GREEDYDATA:url}", value: message_field, only_named_captures: true);
	let fps20 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, ICMPType: %{DATA:icmp_type}, ICMPCode: %{DATA:icmp_code}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}", value: message_field, only_named_captures: true);
	let fps21 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSQuery: %{DATA:dns_query}, DNSRecordType: %{GREEDYDATA:dns_record_type}", value: message_field, only_named_captures: true);
	let fps22 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNry: %{DATA:dns_query}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	let fps23 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSQuery: %{GREEDYDATA:dns_query}", value: message_field, only_named_captures: true);
	let fps24 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, SecIntMatchingIP: %{DATA:sec_int_match_IP}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{GREEDYDATA:url_category}", value: message_field, only_named_captures: true);
	let fps25 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, Dst: %{DATA:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleAction: %{DATA:access_ctrl_action}, Prefilter Policy: %{DATA:prefilter_policy}, UserName: %{DATA:user_name}, Client: %{DATA:client}, ApplicationProtocol: %{DATA:app_protocol}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, Context: %{DATA:context}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, URLCategory: %{GREEDYDATA:url_category}", value: message_field, only_named_captures: true);
	let fps26 = grok(pattern: "%{SYSLOGTIMESTAMP:timestamp} %{DATA:host} %{DATA:UNWANTED}: Protocol: %{DATA:protocol}, SrcIP: %{IP:src_ip}, OriginalClientIP: %{IP:original_IP}, DstIP: %{IP:dst_ip}, SrcPort: %{NUMBER:src_port}, DstPort: %{NUMBER:dst_port}, TCPFlags: %{WORD:tcp_flags}, IngressInterface: %{DATA:ingress_intf}, EgressInterface: %{DATA:egress_intf}, DE: %{DATA:primary_det_engine}, Policy: %{DATA:policy}, ConnectType: %{DATA:connect_type}, AccessControlRuleName: %{DATA:access_ctrl_name}, AccessControlRuleReason: %{DATA:access_ctrl_reason}, Prefilter Policy: %{DATA:prefilter_policy}, InitiatorPackets: %{DATA:initiator_packets}, ResponderPackets: %{DATA:responder_packets}, InitiatorBytes: %{DATA:initiator_bytes}, ResponderBytes: %{DATA:responder_bytes}, NAPPolicy: %{DATA:net_analysis_policy}, DNSResponseType: %{DATA:dns_response_type}, Sinkhole: %{WORD:sinkhole}, SecIntMatchingIP: %{DATA:sec_int_match_IP}, IPReputationSICategory: %{DATA:reputation_IP_category}, URLCategory: %{DATA:url_category}, URLReputation: %{GREEDYDATA:url_reputation}", value: message_field, only_named_captures: true);
	
	set_fields(fps0);
	set_fields(fps1);
	set_fields(fps2);
	set_fields(fps3);
	set_fields(fps4);
	set_fields(fps5);
	set_fields(fps6);
	set_fields(fps7);
	set_fields(fps8);
	set_fields(fps9);
	set_fields(fps10);
	set_fields(fps11);
	set_fields(fps12);
	set_fields(fps13);
	set_fields(fps14);
	set_fields(fps15);
	set_fields(fps16);
	set_fields(fps17);
	set_fields(fps18);
	set_fields(fps19);
	set_fields(fps20);
	set_fields(fps21);
	set_fields(fps22);
	set_fields(fps23);
	set_fields(fps24);
	set_fields(fps25);
	set_fields(fps26);
	
end
rule "FMC rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let fmc0 = grok("%{FMC0}", to_string($message.message), true);
	let fmc1 = grok("%{FMC1}", to_string($message.message), true);
	let fmc2 = grok("%{FMC2}", to_string($message.message), true);
	let fmc3 = grok("%{FMC3}", to_string($message.message), true);
	let fmc4 = grok("%{FMC4}", to_string($message.message), true);
	let fmc5 = grok("%{FMC5}", to_string($message.message), true);
	let fmc6 = grok("%{FMC6}", to_string($message.message), true);
	let fmc7 = grok("%{FMC7}", to_string($message.message), true);
	let fmc8 = grok("%{FMC8}", to_string($message.message), true);
	
	set_fields(fmc0);
	set_fields(fmc1);
	set_fields(fmc2);
	set_fields(fmc3);
	set_fields(fmc4);
	set_fields(fmc5);
	set_fields(fmc6);
	set_fields(fmc7);
	set_fields(fmc8);

end


rule "FPH rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let fph00 = grok("%{FPH00}", to_string($message.message), true);

	set_fields(fph00);

end
rule "SonicWall rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let sw0 = grok("%{SW0}", to_string($message.message), true);
	let sw1 = grok("%{SW1}", to_string($message.message), true);
	let sw2 = grok("%{SW2}", to_string($message.message), true);
	let sw3 = grok("%{SW3}", to_string($message.message), true);
	let sw4 = grok("%{SW4}", to_string($message.message), true);
	let sw5 = grok("%{SW5}", to_string($message.message), true);
	let sw6 = grok("%{SW6}", to_string($message.message), true);
	let sw7 = grok("%{SW7}", to_string($message.message), true);
	let sw8 = grok("%{SW8}", to_string($message.message), true);
	let sw9 = grok("%{SW9}", to_string($message.message), true);
	let sw10 = grok("%{SW10}", to_string($message.message), true);
	let sw11 = grok("%{SW11}", to_string($message.message), true);
	let sw12 = grok("%{SW12}", to_string($message.message), true);
	let sw13 = grok("%{SW13}", to_string($message.message), true);
	let sw14 = grok("%{SW14}", to_string($message.message), true);
	let sw15 = grok("%{SW15}", to_string($message.message), true);
	let sw16 = grok("%{SW16}", to_string($message.message), true);
	let sw17 = grok("%{SW17}", to_string($message.message), true);
	let sw18 = grok("%{SW18}", to_string($message.message), true);
	let sw19 = grok("%{SW19}", to_string($message.message), true);
	let sw20 = grok("%{SW20}", to_string($message.message), true);
	let sw21 = grok("%{SW21}", to_string($message.message), true);
	let sw22 = grok("%{SW22}", to_string($message.message), true);
	let sw23 = grok("%{SW23}", to_string($message.message), true);
	let sw24 = grok("%{SW24}", to_string($message.message), true);
	let sw25 = grok("%{SW25}", to_string($message.message), true);
	let sw26 = grok("%{SW26}", to_string($message.message), true);
	let sw99 = grok("%{SW99}", to_string($message.message), true);

	set_fields(sw0);
	set_fields(sw1);
	set_fields(sw2);
	set_fields(sw3);
	set_fields(sw4);
	set_fields(sw5);
	set_fields(sw6);
	set_fields(sw7);
	set_fields(sw8);
	set_fields(sw9);
	set_fields(sw10);
	set_fields(sw11);
	set_fields(sw12);
	set_fields(sw13);
	set_fields(sw14);
	set_fields(sw15);
	set_fields(sw16);
	set_fields(sw17);
	set_fields(sw18);
	set_fields(sw19);
	set_fields(sw20);
	set_fields(sw21);
	set_fields(sw22);
	set_fields(sw23);
	set_fields(sw24);
	set_fields(sw25);
	set_fields(sw26);
	set_fields(sw99);
	
end

rule "Zscaler rule"
when
    has_field("message")
then

	let message_field = to_string($message.message);

	let zscaler = grok(pattern: "login=%{DATA:user} dname=%{DATA:web_hostname} dip=%{IP:dst_ip} sip=%{IP:src_ip} natPublicIp=%{IP:translated_ip} url=%{DATA:url} ua=%{DATA:user_agent} module=%{DATA:module} proto=%{DATA:protocol} action=%{DATA:action} reason=%{DATA:reason} appName=%{DATA:app_Name} appClass=%{DATA:app_Class} fileType=%{DATA:file_Type} malwareCat=%{DATA:malware_Cat} malwareClass=%{DATA:malware_Class} threatName=%{DATA:threat_Name} riskScore=%{INT:risk_Score} DLPEng=%{DATA:dlp_Eng} DLPDict=%{DATA:dlp_Dict} location=%{DATA:location} dept=%{DATA:dept} reqMethod=%{DATA:req_Method} respCode=%{INT:resp_Code} respVersion=%{DATA:resp_Version} urlClass=%{DATA:url_Class} urlSuperCat=%{DATA:url_Super_Cat} urlCat=%{DATA:url_Cat} referer=%{GREEDYDATA:referer}", value: message_field, only_named_captures: true);

	set_fields(zscaler);

end

my grok patterns are as follows

BASE10NUM	(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))	
BASE16FLOAT	\b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b	
BASE16NUM	(?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))	
CISCO_ACTION	Built|Teardown|Deny|Denied|denied|requested|permitted|denied by ACL|discarded|est-allowed|Dropping|created|deleted|drop	
CISCO_APP	SSL	
CISCO_DIRECTION	Inbound|inbound|Outbound|outbound	
CISCO_FLAGS	FIN ACK|RST|RST ACK|ACK|SYN ACK|FIN ACK|PSH ACK	
CISCO_INTERVAL	first hit|%{INT}-second interval	
CISCO_IPS	IPS	
CISCO_PORT	\(%{INT}\)	
CISCO_REASON	Duplicate TCP SYN|LU allocate xlate failed|\(no connection\)|Failed to locate egress interface|Invalid transport field|No matching connection|DNS Response|DNS Query|(?:%{WORD}\s*)*	
CISCO_SSL	SSL handshake|SSL client|SSL session	
CISCO_XLATE_TYPE	static|static-nat|dynamic	
CISCOMAC	(?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})	
CISCOTIMESTAMP	%{MONTH} +%{MONTHDAY}(?: %{YEAR})? %{TIME}	
COMBINEDAPACHELOG	%{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}	
COMMONAPACHELOG	%{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)	
COMMONMAC	(?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})	
DATA	.*?	
DATE	%{DATE_US}|%{DATE_EU}	
DATE_EU	%{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}	
DATE_US	%{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}	
DATESTAMP	%{DATE}[- ]%{TIME}	
DATESTAMP_EVENTLOG	%{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}	
DATESTAMP_OTHER	%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}	
DATESTAMP_RFC2822	%{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}	
DATESTAMP_RFC822	%{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}	
DAY	(?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)	
DST_ACCT	[a-zA-Z0-9._-]+	
DST_HOSTNAME	\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)	
EMAILADDRESS	%{EMAILLOCALPART}@%{HOSTNAME}	
EMAILLOCALPART	[a-zA-Z][a-zA-Z0-9_.+-=:]+	
FIREPOWER_CATEGORY	CnC Connected|Impact 2 Attack	
FIREPOWER_CHARACTERS	\<\*\-|\-\*\>	
FIREPOWER_CLASSIFICATION	Attempted Administrator Privilege Gain|Detection of a Non-Standard Protocol or Event|Not Suspicious Traffic|Potentially Bad Traffic|Access to a Potentially Vulnerable Web Application|Attempted Information Leak|Misc Activity|Generic Protocol Command Decode|Web Application Attack|A Network Trojan was Detected|Attempted User Privilege Gain	
FIREPOWER_COUNTRY	%{WORD}|united states|united kingdom|slovak republic|slovakia \(slovak republic\)	
FIREPOWER_EVENT	Security Intelligence Event - CnC|Impact 2 Intrusion Event - attempted-user|Impact 2 Intrusion Event - attempted-admin|Impact 2 Intrusion Event - web-application-attack	
FIREPOWER_HOST	Load Balancer	
FIREPOWER_IMPACT	Currently Not Vulnerable|Potentially Vulnerable|Vulnerable	
FIREPOWER_IOC	Host IOC Set|Host Type Changed to Network Device	
FIREPOWER_IPORHOSTNAME	(?:%{IPv4}|%{HOSTNAME})	
FIREPOWER_MESSAGE	Gobbles exploit|Protocol mismatch|BARE BYTE UNICODE ENCODING|OVERSIZE REQUEST-URI DIRECTORY|BARE BYTE UNICODE ENCODING|Connection-oriented DCE/RPC|SMB - Bad NetBIOS Session Service session type|Connection-oriented DCE/RPC - Invalid major version|SERVER-IIS encoding access|SERVER-APACHE Apache Struts wildcard matching OGNL remote code execution attempt|OS-WINDOWS Microsoft Windows SMB anonymous session IPC share access attempt|APP-DETECT failed FTP login attempt|Attempted command buffer overflow|Invalid FTP command|SERVER-IIS register.asp access|Telnet command on FTP command channel|SQL 1 = 1 - possible sql injection attempt|POLICY-OTHER Microsoft Windows Terminal server request attempt|INDICATOR-COMPROMISE Suspicious .cc dns query TEST|PROTOCOL-DNS domain not found containing random-looking hostname - possible DGA detected|SERVER-IIS tilde character file name discovery attempt|Attempted response buffer overflow|SMB - NetBIOS data length less than SMB header length|SQL use of sleep function with and - likely SQL injection|SERVER-IIS view source via translate header|BLACKLIST DNS reverse lookup response for known malware domain spheral.ru - Win.Trojan.Glupteba	
FIREPOWER_NONSENSE	(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))):(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))):(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))	
FIREPOWER_PROTOCOL	tcp|udp	
FMC0	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: \[%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}\] %{WORD:app_protocol}: %{DATA:message_firepower} \[Impact: %{DATA:impact}\] From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} \[Classification: %{DATA:classification}\] \[Priority: %{NUMBER:priority}\] \{%{FIREPOWER_PROTOCOL:transport_prot}\} %{IPV4:src_ip}:%{NUMBER:src_port} \(%{DATA:src_country}\)->%{IPV4:dst_ip}:%{NUMBER:dst_port} \(%{GREEDYDATA:dst_country}\)	
FMC1	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: \[%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}\] %{DATA:message_firepower} \[Impact: %{DATA:impact}\] From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} \[Classification: %{FIREPOWER_CLASSIFICATION:classification}\] \[Priority: %{NUMBER:priority}\] \{%{FIREPOWER_PROTOCOL:transport_prot}\} %{IPV4:src_ip}:%{NUMBER:src_port} \(%{DATA:src_country}\)->%{IPV4:dst_ip}:%{NUMBER:dst_port} \(%{GREEDYDATA:dst_country}\)	
FMC2	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: %{FIREPOWER_CHARACTERS:UNWANTED} %{FIREPOWER_IOC:UNWANTED} From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} %{FIREPOWER_CHARACTERS:UNWANTED} IP Address: %{IPV4:src_ip} Category: %{DATA:category}; Event Type: %{GREEDYDATA:event}	
FMC3	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: \[%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}\] %{DATA:message_firepower} \[Impact: %{DATA:impact}\] From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} \[Classification: %{DATA:classification}\] \[Priority: %{NUMBER:priority}\] \{%{FIREPOWER_PROTOCOL:transport_prot}\} %{IPV4:src_ip}:%{NUMBER:src_port} \(%{DATA:src_country}\)->%{IPV4:dst_ip}:%{NUMBER:dst_port} \(%{GREEDYDATA:dst_country}\)	
FMC4	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: \[%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}:%{NUMBER:UNWANTED}\] \"%{DATA:message_firepower}\" \[Impact: %{DATA:impact}\] From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} \[Classification: %{DATA:classification}\] \[Priority: %{NUMBER:priority}\] \{%{FIREPOWER_PROTOCOL:transport_prot}\} %{IPV4:src_ip}:%{NUMBER:src_port} \(%{DATA:src_country}\)->%{IPV4:dst_ip}:%{NUMBER:dst_port} \(%{GREEDYDATA:dst_country}\)	
FMC5	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: %{FIREPOWER_CHARACTERS:UNWANTED} %{DATA:UNWANTED} From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} %{FIREPOWER_CHARACTERS:UNWANTED} IP Address: %{IPV4:src_IP} Category: %{DATA:category}; Event Type: %{GREEDYDATA:event}	
FMC6	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: %{FIREPOWER_CHARACTERS:UNWANTED} %{DATA:UNWANTED} From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} %{FIREPOWER_CHARACTERS:UNWANTED} IP Address: %{IPV4:src_ip} Category: %{DATA:category}; Event Type: %{GREEDYDATA:event}	
FMC7	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: %{FIREPOWER_CHARACTERS:UNWANTED} %{DATA:UNWANTED} From \"%{DATA:sensor_IP_or_name}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} %{FIREPOWER_CHARACTERS:UNWANTED} IP Address: %{IPV4:src_ip} Host Type: %{GREEDYDATA:host_type}	
FMC8	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: \<\*\- Host IOC Set From \"%{DATA:sensor}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} \-\*\> IP Address: %{IPV4:src_ip} Category: %{DATA:category}; Event Type: %{GREEDYDATA:event}	
FMC9	%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:tag}: \<\*\- Host IOC Set From \"%{DATA:sensor}\" at %{DAY:UNWANTED} %{MONTH:UNWANTED} %{MONTHDAY:UNWANTED} %{TIME:UNWANTED} %{YEAR:UNWANTED} %{WORD:UNWANTED} \-\*\> IP Address: %{IPV4:src_ip} Category: %{DATA:category}; Event Type: %{GREEDYDATA:event}	
FPH00	%{SYSLOGTIMESTAMP:sensor_timestamp} %{DATA:UNWANTED} %{WORD:UNWANTED}: HMNOTIFY: %{DATA:notification} \(Sensor %{DATA:sensor}\): Severity: %{WORD:severity}: %{GREEDYDATA:cause}	
GREEDYDATA	.*	
HOSTNAME	\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)	
HOSTPORT	%{IPORHOST}:%{POSINT}	
HOUR	(?:2[0123]|[01]?[0-9])	
HTTPD_ERRORLOG	%{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}	
HTTPD20_ERRORLOG	\[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg}	
HTTPD24_ERRORLOG	\[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel}\] \[pid %{POSINT:pid}:tid %{NUMBER:tid}\]( \(%{POSINT:proxy_errorcode}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message}	
HTTPDATE	%{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}	
HTTPDERROR_DATE	%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}	
HTTPDUSER	%{EMAILADDRESS}|%{USER}	
INT	(?:[+-]?(?:[0-9]+))	
IP	(?:%{IPV6}|%{IPV4})	
IPORHOST	(?:%{IP}|%{HOSTNAME})	
IPV4	(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9])	
IPV6	((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?	
ISO8601_SECOND	(?:%{SECOND}|60)	
ISO8601_TIMEZONE	(?:Z|[+-]%{HOUR}(?::?%{MINUTE}))	
LOGLEVEL	([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)	
MAC	(?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})	
MINUTE	(?:[0-5][0-9])	
MONTH	\b(?:Jan(?:uary|uar)?|Feb(?:ruary|ruar)?|M(?:a|ä)?r(?:ch|z)?|Apr(?:il)?|Ma(?:y|i)?|Jun(?:e|i)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|O(?:c|k)?t(?:ober)?|Nov(?:ember)?|De(?:c|z)(?:ember)?)\b	
MONTHDAY	(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])	
MONTHNUM	(?:0?[1-9]|1[0-2])	
MONTHNUM2	(?:0[1-9]|1[0-2])	
MSG	.*	
NONNEGINT	\b(?:[0-9]+)\b	
NOTSPACE	\S+	
NUMBER	(?:%{BASE10NUM})	
PATH	(?:%{UNIXPATH}|%{WINPATH})	
POSINT	\b(?:[1-9][0-9]*)\b	
PROG	[\x21-\x5a\x5c\x5e-\x7e]+	
QS	%{QUOTEDSTRING}	
QUOTEDSTRING	(?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))	
SAFE_NAME	[a-zA-Z0-9._-]+	
SECOND	(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)	
SENSOR_TIME	%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR} %{INT}	
SOURCEFIRE_POLICY	AUS \(Cyrus One\) AUTO ERP SILO Policy|IRV (ATT COLO) Policy	
SOURCEFIRE_RULE	EMS J\.SPRAT DETECT ANALYSE	
SOURCEFIRE_USER	WORD|No Authentication Required	
SPACE	\s*	
SW0	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED}	
SW1	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port} dst=%{IP:dst_ip}:%{NUMBER:dst_port} note=\"%{DATA:note}\"	
SW10	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port} proto=%{WORD:prot}/%{DATA:UNWANTED}	
SW11	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} note=\"%{DATA:note}\"	
SW12	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED}	
SW13	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} op=%{DATA:UNWANTED} sent=%{DATA:UNWANTED} rcvd=%{DATA:UNWANTED} result=%{DATA:UNWANTED} dstname=%{DATA:dst_name} arg=%{DATA:arg} code=%{DATA:UNWANTED} Category=\"%{DATA:category}\"	
SW14	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port} dst=%{IP:dst_ip}:%{NUMBER:dst_port} note=\"%{DATA:note}\"	
SW15	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port} dst=%{IP:dst_ip}:%{NUMBER:dst_port}	
SW16	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{DATA:UNWANTED} rcvd=%{DATA:UNWANTED}	
SW17	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} sess=%{DATA:sess} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{DATA:UNWANTED} rcvd=%{DATA:UNWANTED}	
SW18	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" sess=%{DATA:sess} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{IPORHOST:prot} note=\"%{DATA:note}\"	
SW19	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} sess=%{DATA:sess} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{DATA:UNWANTED}	
SW2	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port} dst=%{IP:dst_ip}:%{NUMBER:dst_port}	
SW20	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{IPORHOST:prot}	
SW21	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" dur=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED}	
SW22	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} op=%{DATA:UNWANTED} sent=%{DATA:UNWANTED} result=%{DATA:UNWANTED} dstname=%{DATA:dst_name} arg=%{DATA:arg} vpnpolicy=\"%{DATA:vpn_policy}\" code=%{DATA:UNWANTED} Category=\"%{DATA:category}\"	
SW23	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" sess=%{DATA:sess} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED}	
SW24	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" sess=%{DATA:sess} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} note=\"%{DATA:note}\"	
SW25	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port} dst=%{IP:dst_ip}:%{NUMBER:dst_port}	
SW26	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} op=%{DATA:UNWANTED} sent=%{DATA:UNWANTED} result=%{DATA:UNWANTED} dstname=%{DATA:dst_name} code=%{DATA:UNWANTED} Category=\"%{DATA:category}\"	
SW3	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} op=%{DATA:UNWANTED} sent=%{DATA:UNWANTED} rcvd=%{DATA:UNWANTED} result=%{DATA:UNWANTED} dstname=%{DATA:dst_name}:%{NUMBER:dst_port} arg=%{DATA:arg} code=%{DATA:UNWANTED} Category=\"%{DATA:category}\"	
SW4	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} sess=%{DATA:sess} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{NUMBER:UNWANTED} rcvd=%{NUMBER:UNWANTED} vpnpolicy=\"%{DATA:vpn_policy}\"	
SW5	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" sess=%{DATA:sess} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED}	
SW6	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{NUMBER:UNWANTED} vpnpolicy=\"%{DATA:vpn_policy}\"	
SW7	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{NUMBER:UNWANTED}	
SW8	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" f=%{NUMBER:UNWANTED} sess=%{DATA:sess} n=%{NUMBER:UNWANTED} usr=\"%{DATA:user}\" src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port}:%{DATA:UNWANTED} proto=%{WORD:prot}/%{DATA:UNWANTED} sent=%{NUMBER:UNWANTED} vpnpolicy=\"%{DATA:vpn_policy}\"	
SW9	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED} src=%{IP:src_ip}:%{NUMBER:src_port}:%{DATA:UNWANTED} dst=%{IP:dst_ip}:%{NUMBER:dst_port} note=\"%{DATA:note}\"	
SW99	fw=%{IP:fw_ip} pri=%{NUMBER:UNWANTED} c=%{NUMBER:UNWANTED} m=%{NUMBER:UNWANTED} msg=\"%{DATA:msg}\" n=%{NUMBER:UNWANTED}	
SYSLOGBASE	%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:	
SYSLOGFACILITY	<%{NONNEGINT:facility}.%{NONNEGINT:priority}>	
SYSLOGHOST	%{IPORHOST}	
SYSLOGPROG	%{PROG:program}(?:\[%{POSINT:pid}\])?	
SYSLOGTIMESTAMP	%{MONTH} +%{MONTHDAY} %{TIME}	
SYSLOGTIMESTAMP_1	%{MONTH} %{MONTHDAY} %{TIME}	
TIME	(?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])	
TIMESTAMP_ISO8601	%{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?	
TTY	(?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))	
TZ	(?:[PMCE][SD]T|UTC)	
UNIXPATH	(/([\w_%!$@:.,~-]+|\\.)*)+	
URI	%{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?	
URIHOST	%{IPORHOST}(?::%{POSINT:port})?	
URIPARAM	\?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]<>]*	
URIPATH	(?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+	
URIPATHPARAM	%{URIPATH}(?:%{URIPARAM})?	
URIPROTO	[A-Za-z]+(\+[A-Za-z+]+)?	
USER	%{USERNAME}	
USERNAME	[a-zA-Z0-9._-]+	
UUID	[A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}	
WINDOWSMAC	(?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})	
WINPATH	(?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+	
WORD	\b\w+\b	
YEAR	(?>\d\d){1,2}

as soon as i’m enabling the firepower sensors rule my process buffer gets to 100% as well as the disk journal…

my buffers and batch size are:
output_batch_size = 20000
processbuffer_processors = 20
outputbuffer_processors = 6

this was like the sweetspot i have manged to find for my setup, for the processbuffer_processors i had previously 12, 16 and 18 and it was even worse…

any idea of how i can enable the firepower sensors rule without killing the processbuffer and the disk journal???

thanks,
Marius.

did you understand what you are doing in the processing rules?

You run every GROK - what is a REGEX on every incoming message.

The idea of the procssing pipeline is to minimize the messages that you run the patterns on. Means you only run the heavy patterns if you really need to.

If you really want to keep up that way of working, the only option you have - add more powerful CPU and more of them to Graylog.

thanks for the reply Jan.

unfortunately i can’t add more CPU, but i think i could add extra servers that would parse those messages…

ASA rule has to GROK every message that comes in 20 times.
SonicWall rule has to GROK every message that comes in 27 times.
Firepower rule has to GROK every message that comes in 26 times.
FMC rule has to GROK every message that comes in 8 times.

Lets assume you have separate pipelines for ASA/Firepower/Sonicwall/FMC and average number of GROKs to be 20 for all 15k/second messages… you would have to scale up to compute ~300,000 GROKs per second. Yow.

Rather than scaling up your hardware, you can eliminate a ton of processing by creating more precise rules as well as splitting rules into pipeline staging. For instance, you could GROK/REGEX the key differentiators of the firepower message into a field and then have subsequent rules/stages kick off based on that differentiator field and do the processing of breaking out the fields you want.

set up the first rules/stages to process the items that happen the most so that processing on a message stops before you get to evaluating the rare stuff.

2 Likes

thanks for the idea :wink:

ok, i’ll try to put the grok patters that happen the most at stage 0 and the rare ones in stage 1.
i’ll let you know about the result

i have test it, and created a couple more stages, but it still looks like i might be forced to scale in the future, because sometimes i get something like 35k msg/s and the the process bufer gets filled to almost 40%, and the servers manages to clear it when it doesn’t receive that many messages.

if you run all of the above mentioned GROKS on each incoming message your Graylog would need to run 83 heavy regex rules on that message.

You can throw more hardware at the problem but it would be WAY smarter to improve your rules …

i have tried that before and the funny thing is that i have managed to reduce it only by 3 rules in order to get the same result

I am curious as to how you have re-adjusted your rules.

Assuming that all the data is coming in on one input (I might have different inputs per device type… but if you don’t want to…) the first pipeline I would create (without knowing much specific) a series of rules set up with something like “if hostname is [any ASA machine] Route_to_stream(“ASA_Rules”)” then another rule to route Firepower data to it’s stream, SonicWall to it’s stream… Then you can cascade the information through the receiving stream with the most chatty stuff happening first (dns, user streams/ssl etc) and the rare stuff happening in subsequent stages or routing again to a different stream before further unneeded processing happens to it.

If you are grok’ing the same message 26 times (bleh) and THEN using set_fields() on each grok of the same message… that is hugely inefficient too… each non-unique field is re-set to the same thing as much as 26 times…

the thing is that i’m using different inputs for every type of log messages i’m receiving in graylog…

more than this the way i see it graylog is trying to parse the messages all at the same time, and not doing it based on input… while using pipelines

Either set up extractors on the input - your own or from the marketplace - ([https://marketplace.graylog.org/addons?tag=firepower]) or start with a rule that uses one GROK to parse all common fields, then use one of those fields as a differentiator (i.e. When to_string($message.policy) == “dns_traffic_allowed”) to choose when GROK the rest of the fields out and set them. This way you are processing comparisons and only 2-3 GROK’s on each message rather than 26 heavy GROK calculations then 26 repetitive field insertions. Use pipeline stages to sequence what happens.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.