Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNS - implementing TCP #486

Merged
merged 3 commits into from
Dec 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 202 additions & 9 deletions packetbeat/protos/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
// messages.
//
// Future Additions:
// * Implement TcpProtocolPlugin.
// * Publish a message when packets are received that cannot be decoded.
// * Publish a message when Query packets are received that cannot be decoded.
// * Add EDNS and DNSSEC support (consider using miekg/dns instead
// of gopacket).
// * Consider adding ICMP support to
Expand All @@ -30,6 +29,7 @@ import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"

"github.com/tsg/gopacket"
"github.com/tsg/gopacket/layers"
Expand All @@ -45,8 +45,11 @@ const (

// Notes that are added to messages during exceptional conditions.
const (
NonDnsPacketMsg = "Packet's data could not be decoded as DNS."
DuplicateQueryMsg = "Another query with the same DNS ID from this client " +
NonDnsPacketMsg = "Packet's data could not be decoded as DNS."
NonDnsCompleteMsg = "Message's data could not be decoded as DNS."
NonDnsResponsePacketMsg = "Response packet's data could not be decoded as DNS."
EmptyMsg = "Message's data is empty."
DuplicateQueryMsg = "Another query with the same DNS ID from this client " +
"was received so this query was closed without receiving a response."
OrphanedResponseMsg = "Response was received without an associated query."
NoResponse = "No response to this query was received."
Expand All @@ -60,6 +63,8 @@ const (
TransportUdp
)

const DecodeOffset = 2

var TransportNames = []string{
"tcp",
"udp",
Expand Down Expand Up @@ -166,7 +171,7 @@ type DnsMessage struct {
// DnsStream contains DNS data from one side of a TCP transmission. A pair
// of DnsStream's are used to represent the full conversation.
type DnsStream struct {
tcptuple *common.TcpTuple
tcpTuple *common.TcpTuple

data []byte

Expand Down Expand Up @@ -318,7 +323,7 @@ func (dns *Dns) ParseUdp(pkt *protos.Packet) {
logp.Debug("dns", "Parsing packet addressed with %s of length %d.",
pkt.Tuple.String(), len(pkt.Payload))

dnsPkt, err := decodeDnsPacket(pkt.Payload)
dnsPkt, err := decodeDnsData(TransportUdp, pkt.Payload)
if err != nil {
// This means that malformed requests or responses are being sent or
// that someone is attempting to the DNS port for non-DNS traffic. Both
Expand All @@ -344,6 +349,10 @@ func (dns *Dns) ParseUdp(pkt *protos.Packet) {
}
}

func (dns *Dns) ConnectionTimeout() time.Duration {
return dns.transactionTimeout
}

func (dns *Dns) receivedDnsRequest(tuple *DnsTuple, msg *DnsMessage) {
logp.Debug("dns", "Processing query. %s", tuple)

Expand Down Expand Up @@ -690,20 +699,204 @@ func nameToString(name []byte) string {
return string(s)
}

// decodeDnsPacket decodes a byte array into a DNS struct. If an error occurs
// decodeDnsData decodes a byte array into a DNS struct. If an error occurs
// then the returnd dns pointer will be nil. This method recovers from panics
// and is concurrency-safe.
func decodeDnsPacket(data []byte) (dns *layers.DNS, err error) {
func decodeDnsData(transport Transport, data []byte) (dns *layers.DNS, err error) {
var offset int
if transport == TransportTcp {
offset = DecodeOffset
}

// Recover from any panics that occur while parsing a packet.
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()

d := &layers.DNS{}
err = d.DecodeFromBytes(data, gopacket.NilDecodeFeedback)
err = d.DecodeFromBytes(data[offset:], gopacket.NilDecodeFeedback)
if err != nil {
return nil, err
}
return d, nil
}

// TCP implementation

func (dns *Dns) Parse(pkt *protos.Packet, tcpTuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
defer logp.Recover("DNS ParseTcp")

logp.Debug("dns", "Parsing packet addressed with %s of length %d.",
pkt.Tuple.String(), len(pkt.Payload))

priv := dnsPrivateData{}

if private != nil {
var ok bool
priv, ok = private.(dnsPrivateData)
if !ok {
priv = dnsPrivateData{}
}
}

payload := pkt.Payload

stream := &priv.Data[dir]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, pointer of pointer. I'd rather byte the bullet and do the assignment in line 753 instead of having to type (*stream) all over the place.

Check redis code for some cleaned-up solution (using pointers for connection) for cast and append (using streambuf.Buffer though). https://github.com/elastic/beats/blob/master/packetbeat/protos/redis/redis.go#L107

the dnsPrivateData data type is actually the connection context. Consider naming it so

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do that 😃


if *stream == nil {
*stream = &DnsStream{
tcpTuple: tcpTuple,
data: payload,
message: &DnsMessage{Ts: pkt.Ts, Tuple: pkt.Tuple},
}
if len(payload) <= DecodeOffset {
logp.Debug("dns", EmptyMsg+" addresses %s",
tcpTuple.String())

return priv
}
} else {
(*stream).data = append((*stream).data, payload...)
dataLength := len((*stream).data)
if dataLength > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("dns", "Stream data too large, dropping DNS stream")
return priv
}
if dataLength <= DecodeOffset {
logp.Debug("dns", EmptyMsg+" addresses %s",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message is not empty, but it is incomplete and the code will wait for more bytes to complete the message. I think the other protocols log messages for a similar case so maybe take a look at those messages.

tcpTuple.String())
return priv
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once two bytes are received, they can be decoded as a uint16 then the length the buffer can be checked to see if it is 2 + [length from packet].

  • If the check fails, then just return priv so that it waits for more bytes to be received.
  • If the check passes, then decode the packet. If decoding fails, drop the data (? since waiting for more bytes is not the answer).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure to understand the length the buffer can be checked to see if it is 2 + [length from packet]. From what I understand though, a message might be decodable only if its length is superior to 2. And the only way to make sure it is decodable is to try and decode it each time a new payload is received, i.e. when Parse() is invoked.
Please correct me if I'm wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am suggesting is to follow RFC 1035's recommendation: "This length field allows
the low-level processing to assemble a complete message before beginning
to parse it."

So instead of trying to parse the payload each time data is received, only begin parsing after the complete message has been received. The way to make the determination that the complete message has been received is to check the length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I finally get it 👍 Dunno why it took me so long

}

data, err := decodeDnsData(TransportTcp, (*stream).data)

if err != nil {
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len((*stream).data))

// wait for decoding with the next segment
return priv
}

dns.messageComplete(tcpTuple, dir, *stream, data)
return priv
}

func (dns *Dns) messageComplete(tcpTuple *common.TcpTuple, dir uint8, s *DnsStream, decodedData *layers.DNS) {
dns.handleDns(s.message, tcpTuple, dir, s.data, decodedData)

s.PrepareForNewMessage()
}

func (dns *Dns) handleDns(m *DnsMessage, tcpTuple *common.TcpTuple, dir uint8, data []byte, decodedData *layers.DNS) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tcp specific or use by TCP and UDP? If tcp specific, maybe reflect this in the name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleDns has parameters that make it specific to TCP, si IMO it is not necessary to precise it. But if you really think it should also be reflected in the name, please let me know.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not critical and I'm also fine with calling it handleDns, but it's almost 900 lines mixing UDP and TCP based handling. I just stumbled on this when reading not knowing the package in detail. The memcache plugin even splits the module in plugin_tcp.go and plugin_udp.go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I will probably split the file in a future PR.

dnsTuple := DnsTupleFromIpPort(&m.Tuple, TransportTcp, decodedData.ID)
m.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcpTuple.IpPort())
m.Data = decodedData
m.Length = len(data)

if decodedData.QR == Query {
dns.receivedDnsRequest(&dnsTuple, m)
} else /* Response */ {
dns.receivedDnsResponse(&dnsTuple, m)
}
}

func (stream *DnsStream) PrepareForNewMessage() {
stream.message = nil
}

func (dns *Dns) ReceivedFin(tcpTuple *common.TcpTuple, dir uint8, private protos.ProtocolData) protos.ProtocolData {
if private == nil {
return private
}
dnsData, ok := private.(dnsPrivateData)
if !ok {
return private
}
if dnsData.Data[dir] == nil {
return dnsData
}
stream := dnsData.Data[dir]
if stream.message != nil {
decodedData, err := decodeDnsData(TransportTcp, stream.data)

if err == nil {
dns.messageComplete(tcpTuple, dir, stream, decodedData)
} else /*Failed decode */ {
if dir == tcp.TcpDirectionReverse {
dns.publishDecodeFailureNotes(dnsData)
stream.PrepareForNewMessage()
}
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))
}
}

return dnsData
}

func (dns *Dns) GapInStream(tcpTuple *common.TcpTuple, dir uint8, nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {
dnsData, ok := private.(dnsPrivateData)

if !ok {
return private, false
}

stream := dnsData.Data[dir]

if stream == nil || stream.message == nil {
return private, false
}

decodedData, err := decodeDnsData(TransportTcp, stream.data)

// Add Notes if the failed stream is the response
if err != nil {
if dir == tcp.TcpDirectionReverse {
dns.publishDecodeFailureNotes(dnsData)
}

// drop the stream because it is binary and it would be rare to have a decodable message later
logp.Debug("dns", NonDnsCompleteMsg+" addresses %s, length %d",
tcpTuple.String(), len(stream.data))
return private, true
}

// publish and ignore the gap. No case should reach this code though ...
dns.messageComplete(tcpTuple, dir, stream, decodedData)
return private, false
}

// Add Notes to the query stream about a failure to decode the response
func (dns *Dns) publishDecodeFailureNotes(dnsData dnsPrivateData) {
streamOrigin := dnsData.Data[tcp.TcpDirectionOriginal]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't these two values (streamOrigin and streamReverse) be nil checked before use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamReverse is checked before publishDecodeFailureNotes is called (in Gap and Fin). It will be checked a second time in publishDecodeFailureNotes, except if you don't think it's proper.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about streamOrigin, is it also checked prior to invoking publishDecodeFailureNotes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will be done in publishDecodeFailureNotes

streamReverse := dnsData.Data[tcp.TcpDirectionReverse]

if streamOrigin == nil || streamReverse == nil {
return
}

dataOrigin, err := decodeDnsData(TransportTcp, streamOrigin.data)
tupleReverse := streamReverse.message.Tuple

if err == nil {
dnsTupleReverse := DnsTupleFromIpPort(&tupleReverse, TransportTcp, dataOrigin.ID)
hashDnsTupleOrigin := (&dnsTupleReverse).RevHashable()

trans := dns.deleteTransaction(hashDnsTupleOrigin)

if trans == nil { // happens when a Gap is followed by Fin
return
}

trans.Notes = append(trans.Notes, NonDnsResponsePacketMsg)

dns.publishTransaction(trans)
dns.deleteTransaction(hashDnsTupleOrigin)
} else {
logp.Debug("dns", "Unabled to decode response with adresses %s has no associated query", streamReverse.tcpTuple.String())
}
}
Loading