paloalto日志转发到elk2


改进

上次用rsyslog对paloalto的日志进行存储(文件),之后通过查看资料发现logstash可以完全代替rsyslog的功能,直接监听端口就可以,架构修改如下: paloalto -> logstash -> elastic search -> kibana

配置如下:

input {
    tcp {
        port => 513
        type => paloalto
    }
}

之后就是对paloalto的日志进行加工,方便投递elastic search 中,如下 :

######## 以下规则来自国外友人博客[引用]
######## PALOALTO FILTER #####################
filter {
if [type] == "paloalto" {
grok {
patterns_dir => "/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.0.2/patterns"
match => [ "message", "%<%{POSINT}>%{MONTH} %{MONTHDAY} %{TIME} %{GREEDYDATA:palo_message}" ]
}

mutate {
rename => ["palomessage", "message"]
}

if [type] == "paloalto" and [message] =~ /TRAFFIC/ {
csv {
columns => [ "Domain", "ReceiveTime", "SerialNum #", "Type", "Threat-ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser","DestinationUser","Application","VirtualSystem","SourceZone","DestinationZone","InboundInterface","OutboundInterface","LogAction","TimeLogged","SessionID","RepeatCount","SourcePort","DestinationPort","NATSourcePort","NATDestinationPort","Flags","IPprotocol","Action","Bytes","BytesSent","BytesReceived","Packets","StartTime","ElapsedTimeInSec)","Category","Padding","seqno","actionflags","SourceCountry","DestinationCountry","cpadding","pkts_sent","pkts_received", "session_end_reason"
]
}
}
else if [type] == "paloalto" and [message] !~ /TRAFFIC/ {
csv {
columns => [ "Domain", "ReceiveTime", "SerialNum", "Type", "Threat-ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort" , "Flags", "IPprotocol", "Action", "URL", "Threat-ContentName", "Category", "reportid", "Severity", "Direction", "seqno", "actionflags", SourceCountry, "DestinationCountry", "cpadding", "ContentType", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient"
]
}
}

date {
timezone => "Asia/Shanghai"
match => [ "GenerateTime", "YYYY/MM/dd HH:mm:ss" ]
target => "@timestamp"
}

mutate {
convert => [ "Bytes", "integer" ]
convert => [ "BytesReceived", "integer" ]
convert => [ "BytesSent", "integer" ]
convert => [ "ElapsedTimeInSec", "integer" ]
convert => [ "geoip.area_code", "integer" ]
convert => [ "geoip.dma_code", "integer" ]
convert => [ "geoip.latitude", "float" ]
convert => [ "geoip.longitude", "float" ]
convert => [ "NATDestinationPort", "integer" ]
convert => [ "NATSourcePort", "integer" ]
convert => [ "Packets", "integer" ]
convert => [ "pkts_received", "integer" ]
convert => [ "pkts_sent", "integer" ]
convert => [ "seqno", "integer" ]
gsub => [ "Rule", " ", "_", "Application", "( |-)", "_" ]
remove_field => [ "message", "raw_message" ]
}

################ GEO LOCATION ######################################

if [SourceAddress] and [SourceAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/opt/seim/GeoLite2-City.mmdb"
source => "SourceAddress"
target => "SourceGeo"
}
#Delete 0,0 in SourceGeo.location if equal to 0,0
if ([SourceGeo.location] and [SourceGeo.location] =~ "0,0") {
mutate {
replace => [ "SourceGeo.location", "" ]
}
}
}

#Geolocate logs that have DestinationAddress and if that DestinationAddress is a non-RFC1918 address
if [DestinationAddress] and [DestinationAddress] !~ "(^127\.0\.0\.1)|(^10\.)|(^172\.1[6-9]\.)|(^172\.2[0-9]\.)|(^172\.3[0-1]\.)|(^192\.168\.)|(^169\.254\.)" {
geoip {
database => "/opt/seim/GeoLite2-City.mmdb"
source => "DestinationAddress"
target => "DestinationGeo"
}
#Delete 0,0 in DestinationGeo.location if equal to 0,0
if ([DestinationGeo.location] and [DestinationGeo.location] =~ "0,0") {
mutate {
replace => [ "DestinationAddress.location", "" ]
}
}
}

#Takes the 5-tuple of source address, source port, destination address, destination port, and protocol and does a SHA1 hash to fingerprint the flow. This is a useful
#way to be able to do top N terms queries on flows, not just on one field.
if [SourceAddress] and [DestinationAddress] {
fingerprint {
concatenate_sources => true
method => "SHA1"
key => "logstash"
source => [ "SourceAddress", "SourcePort", "DestinationAddress", "DestinationPort", "IPProtocol" ]
}
}
}
}

output {
if [type] == "paloalto" {
elasticsearch {
hosts => ["192.168.2.72:9200"]
index => "logstash-palo-firewall-%{+YYYY.MM.dd}"
#template => "/opt/seim/elasticsearch-template.json"
#template_overwrite => true
}
}
}

启动logstash

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/pa2.conf

引用