Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mongodb OP_MSG (2013) #8594

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- `http.response.body` moves to `http.response.body.content`
- Changed Packetbeat fields to align with ECS. {issue}7968[7968]
- Removed trailing dot from domain names reported by the DNS protocol. {pull}9941[9941]
- Add support for mongodb opcode 2013 (OP_MSG). {issue}6191[6191] {pull}8594[8594]

*Winlogbeat*

Expand Down
54 changes: 44 additions & 10 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def setUpClass(self):

# Path to test binary
if not hasattr(self, 'test_binary'):
self.test_binary = os.path.abspath(self.beat_path + "/" + self.beat_name + ".test")
self.test_binary = os.path.abspath(
self.beat_path + "/" + self.beat_name + ".test")

template_paths = [
self.beat_path,
Expand Down Expand Up @@ -282,7 +283,8 @@ def read_output_json(self, output_file=None):
# hit EOF
break

event = json.loads(line, object_pairs_hook=self.json_raise_on_duplicates)
event = json.loads(
line, object_pairs_hook=self.json_raise_on_duplicates)
del event['@metadata']
jsons.append(event)
return jsons
Expand Down Expand Up @@ -319,7 +321,8 @@ def setUp(self):
fields_yml = os.path.join(self.beat_path, "fields.yml")
# Only add it if it exists
if os.path.isfile(fields_yml):
shutil.copyfile(fields_yml, os.path.join(self.working_dir, "fields.yml"))
shutil.copyfile(fields_yml, os.path.join(
self.working_dir, "fields.yml"))

try:
# update the last_run link
Expand Down Expand Up @@ -365,7 +368,8 @@ def wait_log_contains(self, msg, logfile=None,
name="log_contains",
ignore_case=False):
self.wait_until(
cond=lambda: self.log_contains(msg, logfile, ignore_case=ignore_case),
cond=lambda: self.log_contains(
msg, logfile, ignore_case=ignore_case),
max_timeout=max_timeout,
poll_interval=poll_interval,
name=name)
Expand Down Expand Up @@ -408,6 +412,30 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):

return counter

def log_contains_countmap(self, pattern, capture_group, logfile=None):
"""
Returns a map of the number of appearances of each captured group in the log file
"""
counts = {}

if logfile is None:
logfile = self.beat_name + ".log"

try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
res = pattern.search(line)
if res is not None:
capt = res.group(capture_group)
if capt in counts:
counts[capt] += 1
else:
counts[capt] = 1
except IOError:
pass

return counts

def output_lines(self, output_file=None):
""" Count number of lines in a file."""
if output_file is None:
Expand Down Expand Up @@ -511,7 +539,8 @@ def extract_fields(doc_list, name):
newName = field["name"]

if field.get("type") == "group":
subfields, subdictfields, subaliases = extract_fields(field["fields"], newName)
subfields, subdictfields, subaliases = extract_fields(
field["fields"], newName)
fields.extend(subfields)
dictfields.extend(subdictfields)
aliases.extend(subaliases)
Expand All @@ -530,9 +559,11 @@ def extract_fields(doc_list, name):
# TODO: Make fields_doc path more generic to work with beat-generator. If it can't find file
# "fields.yml" you should run "make update" on metricbeat folder
with open(fields_doc, "r") as f:
path = os.path.abspath(os.path.dirname(__file__) + "../../../../fields.yml")
path = os.path.abspath(os.path.dirname(
__file__) + "../../../../fields.yml")
if not os.path.isfile(path):
path = os.path.abspath(os.path.dirname(__file__) + "../../../../_meta/fields.common.yml")
path = os.path.abspath(os.path.dirname(
__file__) + "../../../../_meta/fields.common.yml")
with open(path) as f2:
content = f2.read()

Expand All @@ -551,7 +582,8 @@ def extract_fields(doc_list, name):
aliases = []

for item in doc:
subfields, subdictfields, subaliases = extract_fields(item["fields"], "")
subfields, subdictfields, subaliases = extract_fields(
item["fields"], "")
fields.extend(subfields)
dictfields.extend(subdictfields)
aliases.extend(subaliases)
Expand Down Expand Up @@ -644,6 +676,8 @@ def is_documented(key, docs):
for key in flat.keys():
metaKey = key.startswith('@metadata.')
if not(is_documented(key, expected_fields) or metaKey):
raise Exception("Key '{}' found in event is not documented!".format(key))
raise Exception(
"Key '{}' found in event is not documented!".format(key))
if is_documented(key, aliases):
raise Exception("Key '{}' found in event is documented as an alias!".format(key))
raise Exception(
"Key '{}' found in event is documented as an alias!".format(key))
1 change: 1 addition & 0 deletions packetbeat/protos/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func newTransaction(requ, resp *mongodbMessage) *transaction {
trans.params = requ.params
trans.resource = requ.resource
trans.bytesIn = requ.messageLength
trans.documents = requ.documents
}

// fill response
Expand Down
77 changes: 72 additions & 5 deletions packetbeat/protos/mongodb/mongodb_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func mongodbMessageParser(s *stream) (bool, bool) {
mutex.Lock()
defer mutex.Unlock()
if _, reported := unknownOpcodes[opCode]; !reported {
logp.Err("Unknown operation code: %v", opCode)
logp.Err("Unknown operation code: %d (%v)", opCode, opCode)
unknownOpcodes[opCode] = struct{}{}
}
return false, false
Expand All @@ -74,7 +74,7 @@ func mongodbMessageParser(s *stream) (bool, bool) {
s.message.opCode = opCode
s.message.isResponse = false // default is that the message is a request. If not opReplyParse will set this to false
s.message.expectsResponse = false
debugf("opCode = %v", s.message.opCode)
debugf("opCode = %d (%v)", s.message.opCode, s.message.opCode)

// then split depending on operation type
s.message.event = common.MapStr{}
Expand All @@ -83,9 +83,9 @@ func mongodbMessageParser(s *stream) (bool, bool) {
case opReply:
s.message.isResponse = true
return opReplyParse(d, s.message)
case opMsg:
case opMsgLegacy:
s.message.method = "msg"
return opMsgParse(d, s.message)
return opMsgLegacyParse(d, s.message)
case opUpdate:
s.message.method = "update"
return opUpdateParse(d, s.message)
Expand All @@ -105,6 +105,9 @@ func mongodbMessageParser(s *stream) (bool, bool) {
case opKillCursor:
s.message.method = "killCursors"
return opKillCursorsParse(d, s.message)
case opMsg:
s.message.method = "msg"
return opMsgParse(d, s.message)
}

return false, false
Expand Down Expand Up @@ -148,7 +151,7 @@ func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) {
return true, true
}

func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) {
func opMsgLegacyParse(d *decoder, m *mongodbMessage) (bool, bool) {
var err error
m.event["message"], err = d.readCStr()
if err != nil {
Expand Down Expand Up @@ -303,6 +306,61 @@ func opKillCursorsParse(d *decoder, m *mongodbMessage) (bool, bool) {
return true, true
}

func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) {
// ignore flagbits
_, err := d.readInt32()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}

// read sections
kind, err := d.readByte()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}

switch msgKind(kind) {
case msgKindBody:
document, err := d.readDocument()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
m.documents = []interface{}{document}

case msgKindDocumentSequence:
start := d.i
size, err := d.readInt32()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
cstring, err := d.readCStr()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
m.event["message"] = cstring
var documents []interface{}
for d.i < start+size {
document, err := d.readDocument()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
}
documents = append(documents, document)
}
m.documents = documents

default:
logp.Err("Unknown message kind: %v", kind)
return false, false
}

return true, true
}

// NOTE: The following functions are inspired by the source of the go-mgo/mgo project
// https://github.com/go-mgo/mgo/blob/v2/bson/decode.go

Expand Down Expand Up @@ -335,6 +393,15 @@ func (d *decoder) readCStr() (string, error) {
return string(d.in[start:end]), nil
}

func (d *decoder) readByte() (byte, error) {
i := d.i
d.i++
if d.i > len(d.in) {
return 0, errors.New("Read byte failed")
}
return d.in[i], nil
}

func (d *decoder) readInt32() (int, error) {
b, err := d.readBytes(4)

Expand Down
11 changes: 10 additions & 1 deletion packetbeat/protos/mongodb/mongodb_structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,26 @@ type transaction struct {
documents []interface{}
}

type msgKind byte

const (
msgKindBody msgKind = 0
msgKindDocumentSequence msgKind = 1
)

type opCode int32

const (
opReply opCode = 1
opMsg opCode = 1000
opMsgLegacy opCode = 1000
opUpdate opCode = 2001
opInsert opCode = 2002
opReserved opCode = 2003
opQuery opCode = 2004
opGetMore opCode = 2005
opDelete opCode = 2006
opKillCursor opCode = 2007
opMsg opCode = 2013
)

// List of valid mongodb wire protocol operation codes
Expand All @@ -123,6 +131,7 @@ var opCodeNames = map[opCode]string{
2005: "OP_GET_MORE",
2006: "OP_DELETE",
2007: "OP_KILL_CURSORS",
2013: "OP_MSG",
}

func validOpcode(o opCode) bool {
Expand Down
Binary file not shown.
32 changes: 28 additions & 4 deletions packetbeat/tests/system/test_0025_mongodb_basic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from packetbeat import BaseTest


Expand Down Expand Up @@ -220,14 +221,37 @@ def test_request_after_reply(self):
assert o["type"] == "mongodb"
assert o["event.duration"] >= 0

def test_unknown_opcode_flood(self):
def test_opmsg(self):
"""
Tests that a repeated unknown opcode is reported just once.
Tests parser works with opcode 2013 (OP_MSG).
"""
self.render_config_template(
mongodb_ports=[9991]
)
self.run_packetbeat(pcap="mongodb_op_msg_opcode.pcap",
debug_selectors=["mongodb"])
num_msgs = self.log_contains_count('Unknown operation code: ')
assert num_msgs == 1, "Unknown opcode reported more than once: {0}".format(num_msgs)

objs = self.read_output()
o = objs[0]
assert o["type"] == "mongodb"

count = self.log_contains_count('Unknown operation code: ')
assert count == 0

def test_unknown_opcode_flood(self):
"""
Tests that any repeated unknown opcodes are reported just once.
"""
self.render_config_template(
mongodb_ports=[27017]
)
self.run_packetbeat(pcap="mongodb_invalid_opcode_2269.pcap",
debug_selectors=["mongodb"])

unknown_counts = self.log_contains_countmap(
re.compile(r'Unknown operation code: (\d+)'), 1)

assert len(unknown_counts) > 0
for k, v in unknown_counts.items():
assert v == 1, "Unknown opcode reported more than once: opcode={0}, count={1}".format(
k, v)