Skip to content

Commit

Permalink
[SIEM] Add DNS enrichment to system/socket dataset
Browse files Browse the repository at this point in the history
This patch adds DNS monitoring to the system/socket dataset so that
events can be enriched with domain names.

Its been implemented using af_packet so that filtering can be used at
the kernel level and only DNS responses are copied to userspace.

In the future it might be possible to use a kprobes-based implementation
which will make correlating DNS requests to processes much easier, but
we need to find out how to filter DNS requests at kprobe level, given
that sometimes is necessary to inspect the packet to obtain the source
port.
  • Loading branch information
adriansr committed Oct 11, 2019
1 parent 46851d2 commit 158328e
Show file tree
Hide file tree
Showing 12 changed files with 882 additions and 47 deletions.
4 changes: 0 additions & 4 deletions x-pack/auditbeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ datasets - esp. `process` and `socket` - a shorter period is recommended.
*`state.period`*:: The frequency at which the datasets send full state information.
This option can be overridden per dataset using `{dataset}.state.period`.

*`socket.include_localhost`*:: If the `socket` dataset is configured and this
option is set to `true`, Auditbeat will include sockets that have localhost
as either their source and/or destination IP. Defaults to `false`.

*`user.detect_password_changes`*:: If the `user` dataset is configured and
this option is set to `true`, Auditbeat will read password information in `/etc/passwd`
and `/etc/shadow` to detect password changes. A hash will be kept locally in
Expand Down
4 changes: 0 additions & 4 deletions x-pack/auditbeat/module/system/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ datasets - esp. `process` and `socket` - a shorter period is recommended.
*`state.period`*:: The frequency at which the datasets send full state information.
This option can be overridden per dataset using `{dataset}.state.period`.

*`socket.include_localhost`*:: If the `socket` dataset is configured and this
option is set to `true`, Auditbeat will include sockets that have localhost
as either their source and/or destination IP. Defaults to `false`.

*`user.detect_password_changes`*:: If the `user` dataset is configured and
this option is set to `true`, Auditbeat will read password information in `/etc/passwd`
and `/etc/shadow` to detect password changes. A hash will be kept locally in
Expand Down
17 changes: 17 additions & 0 deletions x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and https://www.elastic.co/guide/en/ecs/current/ecs-user.html[user] information.
and memory usage.
- Works on stock kernels without the need of custom modules, external libraries
or development headers.
- Correlates IP addresses with DNS requests.
This dataset does not analyze application-layer protocols nor provide any other
advanced features present in Packetbeat:
Expand Down Expand Up @@ -156,3 +157,19 @@ between the kernel clock and the dataset's reference clock.
- `socket.guess_timeout` (default: 15s)

The maximum time an individual guess is allowed to run.

- `socket.dns.enabled` (default: true)

If DNS traffic must be monitored to enrich network flows with DNS information.

- `socket.dns.type` (default: af_packet)

The method used to monitor DNS traffic. Currently, only `af_packet` is supported.

- `socket.dns.af_packet.interface` (default: any)

The network interface where DNS will be monitored.

- `socket.dns.af_packet.snaplen` (default: 1024)

Maximum number of bytes to copy for each captured packet.
252 changes: 252 additions & 0 deletions x-pack/auditbeat/module/system/socket/dns/afpacket/afpacket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package afpacket

import (
"context"
"net"
"os"
"time"

"github.com/dustin/go-humanize"
"github.com/miekg/dns"
"github.com/pkg/errors"
"golang.org/x/net/bpf"

"github.com/elastic/beats/metricbeat/mb"

"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/layers"

"github.com/elastic/beats/libbeat/logp"
parent "github.com/elastic/beats/x-pack/auditbeat/module/system/socket/dns"
)

var udpSrcPort53Filter = []bpf.RawInstruction{
{Op: 0x28, Jt: 0x0, Jf: 0x0, K: 0xc},
{Op: 0x15, Jt: 0x0, Jf: 0x4, K: 0x86dd},
{Op: 0x30, Jt: 0x0, Jf: 0x0, K: 0x14},
{Op: 0x15, Jt: 0x0, Jf: 0xb, K: 0x11},
{Op: 0x28, Jt: 0x0, Jf: 0x0, K: 0x36},
{Op: 0x15, Jt: 0x8, Jf: 0x9, K: 0x35},
{Op: 0x15, Jt: 0x0, Jf: 0x8, K: 0x800},
{Op: 0x30, Jt: 0x0, Jf: 0x0, K: 0x17},
{Op: 0x15, Jt: 0x0, Jf: 0x6, K: 0x11},
{Op: 0x28, Jt: 0x0, Jf: 0x0, K: 0x14},
{Op: 0x45, Jt: 0x4, Jf: 0x0, K: 0x1fff},
{Op: 0xb1, Jt: 0x0, Jf: 0x0, K: 0xe},
{Op: 0x48, Jt: 0x0, Jf: 0x0, K: 0xe},
{Op: 0x15, Jt: 0x0, Jf: 0x1, K: 0x35},
{Op: 0x6, Jt: 0x0, Jf: 0x0, K: 0xffff},
{Op: 0x6, Jt: 0x0, Jf: 0x0, K: 0x0},
}

type dnsCapture struct {
tPacket *afpacket.TPacket
log *logp.Logger
}

func init() {
parent.Registry.MustRegister("af_packet", newAFPacketSniffer)
}

func newAFPacketSniffer(base mb.BaseMetricSet, log *logp.Logger) (parent.Sniffer, error) {
config := defaultConfig()
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, errors.Wrap(err, "failed to unpack af_packet config")
}

frameSize, blockSize, numBlocks, err := afpacketComputeSize(8*humanize.MiByte, config.Snaplen, os.Getpagesize())
if err != nil {
return nil, err
}

opts := []interface{}{
afpacket.OptFrameSize(frameSize),
afpacket.OptBlockSize(blockSize),
afpacket.OptNumBlocks(numBlocks),
afpacket.SocketRaw,
// Configure a poll timeout so that the capture goroutine
// wakes up periodically to check for termination.
afpacket.OptPollTimeout(time.Millisecond * 500),
}

if config.Interface != "any" {
opts = append(opts, afpacket.OptInterface(config.Interface))
}

tPacket, err := afpacket.NewTPacket(opts...)
if err != nil {
return nil, errors.Wrap(err, "failed creating af_packet sniffer")
}

if err = tPacket.SetBPF(udpSrcPort53Filter); err != nil {
tPacket.Close()
return nil, errors.Wrapf(err, "failed setting BPF filter")
}

c := &dnsCapture{
tPacket: tPacket,
log: log,
}

return c, nil
}

// Monitor starts monitoring for DNS transactions in the background.
func (c *dnsCapture) Monitor(ctx context.Context, consumer parent.Consumer) error {
go c.run(ctx, consumer)
return nil
}

var (
errNotIP = errors.New("network is not IP")
errNotUDP = errors.New("transport is not UDP")
)

func dupSlice(in []byte) []byte {
out := make([]byte, len(in))
copy(out, in)
return out
}

func getEndpoints(pkt gopacket.Packet) (src net.UDPAddr, dst net.UDPAddr, err error) {
netLayer := pkt.NetworkLayer()
if netLayer == nil {
return src, dst, errNotIP
}
switch v := netLayer.(type) {
case *layers.IPv4:
src.IP = dupSlice(v.SrcIP)
dst.IP = dupSlice(v.DstIP)
case *layers.IPv6:
src.IP = dupSlice(v.SrcIP)
dst.IP = dupSlice(v.DstIP)
default:
return src, dst, errNotIP
}
transLayer := pkt.TransportLayer()
if transLayer == nil ||
transLayer.LayerType() != layers.LayerTypeUDP {
return src, dst, errNotUDP
}
udp, ok := transLayer.(*layers.UDP)
if !ok {
return src, dst, errNotUDP
}
src.Port = int(udp.SrcPort)
dst.Port = int(udp.DstPort)
return src, dst, nil
}

func (c *dnsCapture) run(ctx context.Context, consumer parent.Consumer) {
defer c.tPacket.Close()
source := gopacket.ZeroCopyPacketDataSource(c.tPacket)
c.log.Info("Starting DNS capture.")
defer c.log.Info("Stopping DNS capture.")
for {
select {
case <-ctx.Done():
return
default:
}
data, _, err := source.ZeroCopyReadPacketData()
if err != nil {
if err == afpacket.ErrTimeout {
continue
}
c.log.Error("DNS capture error", err)
return
}

pkt := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.NoCopy)
src, dst, err := getEndpoints(pkt)
if err != nil {
c.log.Warn("Failed to decode UDP packet.", err)
continue
}
msg := &dns.Msg{}
if err = msg.Unpack(pkt.TransportLayer().LayerPayload()); err != nil {
c.log.Warn("Failed to unpack UDP payload from port 53.", err)
continue
}

if len(msg.Question) == 0 || (msg.Question[0].Qtype != dns.TypeA && msg.Question[0].Qtype != dns.TypeAAAA) {
continue
}
questionName := trimRightDot(msg.Question[0].Name)
tr := parent.Transaction{
TXID: msg.Id,
Client: dst,
Server: src,
Domain: questionName,
Addresses: make([]net.IP, 0, len(msg.Answer)),
}
for _, ans := range msg.Answer {
switch ans.Header().Rrtype {
case dns.TypeA:
if a, ok := ans.(*dns.A); ok {
tr.Addresses = append(tr.Addresses, a.A)
} else {
c.log.Debug("Unexpected type for DNS A response")
}
case dns.TypeAAAA:
if a, ok := ans.(*dns.AAAA); ok {
tr.Addresses = append(tr.Addresses, a.AAAA)
} else {
c.log.Debug("Unexpected type for DNS AAAA response")
}
default:
continue
}
}
if len(tr.Addresses) > 0 {
if c.log.IsDebug() {
c.log.Debugf("Got DNS transaction client=%s server=%s domain=%s addresses=%v",
tr.Client.String(),
tr.Server.String(),
tr.Domain,
tr.Addresses)
}
consumer(tr)
}
}
}

// Helpers

// afpacketComputeSize computes the block_size and the num_blocks in such a way
// that the allocated mmap buffer is close to but smaller than target_size_mb.
// The restriction is that the block_size must be divisible by both the
// frame size and page size.
func afpacketComputeSize(targetSize int, snaplen int, pageSize int) (
frameSize int, blockSize int, numBlocks int, err error) {

if snaplen < pageSize {
frameSize = pageSize / (pageSize / snaplen)
} else {
frameSize = (snaplen/pageSize + 1) * pageSize
}

// 128 is the default from the gopacket library so just use that
blockSize = frameSize * 128
numBlocks = targetSize / blockSize

if numBlocks == 0 {
return 0, 0, 0, errors.New("Interface buffersize is too small")
}

return frameSize, blockSize, numBlocks, nil
}

func trimRightDot(name string) string {
if len(name) == 0 || name == "." || name[len(name)-1] != '.' {
return name
}
return name[:len(name)-1]
}
19 changes: 19 additions & 0 deletions x-pack/auditbeat/module/system/socket/dns/afpacket/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package afpacket

type config struct {
// Interface to listen on. Defaults to "any".
Interface string `config:"socket.dns.af_packet.interface"`
// Snaplen is the packet snapshot size.
Snaplen int `config:"socket.dns.af_packet.snaplen"`
}

func defaultConfig() config {
return config{
Interface: "any",
Snaplen: 1024,
}
}
19 changes: 19 additions & 0 deletions x-pack/auditbeat/module/system/socket/dns/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package dns

type config struct {
// Enabled toggles the DNS monitoring feature.
Enabled bool `config:"socket.dns.enabled"`
// Type is the dns monitoring implementation used.
Type string `config:"socket.dns.type"`
}

func defaultConfig() config {
return config{
Enabled: true,
Type: "af_packet",
}
}
Loading

0 comments on commit 158328e

Please sign in to comment.