-
Notifications
You must be signed in to change notification settings - Fork 7
/
main.py
310 lines (269 loc) · 13.8 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import sys
import json
import requests
from github import Github, BadCredentialsException
from github.GithubException import UnknownObjectException
from base64 import b64encode
from nacl import encoding, public
args = sys.argv[1:]
source = None
createCommand = "create"
deleteCommand = "delete"
updateCommand = "update"
tokenCommand = "--token"
namesCommand = "--names"
valuesCommand = "--values"
teamCommand = "--team"
repoCommand = "--repo"
dependabotSecretsSyncOption = "--dependabotsecretssync"
repositoryOwner = "--repoowner"
interactiveCommand = "--interactive"
defaultRepoOwnerName = "org-name"
noTokenMessage = "Please provide a valid GitHub PAT using --token <PAT>."
invalidTokenMessage = "The token you provided is invalid."
noActionMessage = "You must specify an action, either create or delete."
noNamesMessage = "You must specify the name(s) of the secrets to be modified."
noTeamMessage = "Invalid team name provided."
invalidNamesAndSecretsMessage = "Secret names and secret values lists are not the same length. This may be due to an " \
"invalid input or a secret that contains a comma. Secrets with comma(s) are currently" \
" not supported."
invalidRepoOwnerMessage = "Invalid Repository Owner name, please pass the value to sync dependabot secrets!"
class UserInput:
def __init__(self, token, action, secret_names, secret_values, target_team_name, target_repo_name, dependabotSecretsSync, repoOwner, interactive):
self.token = token
self.action = action
self.secret_names = secret_names
self.secret_values = secret_values
self.target_team_name = target_team_name
self.target_repo_name = target_repo_name
self.dependabotSecretsSync = dependabotSecretsSync
self.repoOwner = repoOwner
self.interactive = interactive
def get_mandatory_value_from_input(arg_list, label, error_message):
if label in arg_list:
return get_element_after_value(arg_list, label, error_message)
else:
raise ValueError(error_message)
def get_optional_value_from_input(arg_list, label):
if label in arg_list:
return get_element_after_value(arg_list, label,
f"{label} was included in the input but no value was provided")
else:
return ""
def get_element_after_value(arg_list, value, error_message):
try:
result = arg_list[arg_list.index(value) + 1]
if not does_string_start_with_two_dashes(result):
return result
except IndexError:
pass
raise ValueError(error_message)
def does_string_start_with_two_dashes(str1):
if len(str1) < 2:
return False
else:
return str1[0:2] == "--"
def get_github_user(token, message):
try:
g = Github(token)
print(f"Logged in as {g.get_user().name}")
return g
except BadCredentialsException:
raise ValueError(message)
def apply_action(repo_name):
return "y" in input(f"Apply action to {repo_name}? (y/n) ").lower()
def validate_action(candidate_action, create_command, update_command, delete_command, secret_names, secret_values):
if delete_command.lower() in candidate_action.lower():
return delete_command
if update_command.lower() in candidate_action.lower():
return update_command
if create_command.lower() in candidate_action.lower():
if len(secret_names) != len(secret_values):
raise ValueError(invalidNamesAndSecretsMessage)
return create_command
raise ValueError(f"{candidate_action} is not a valid action! Please enter \"{create_command}\",\"{update_command}\""
+ f"\"or {delete_command}\" as the first argument")
def get_input_from_user():
token = input("Github PAT: ")
action = input("Desired action to be performed (create/delete): ")
secret_names = input("Comma separated list of secret names: ").split(',')
if action.lower() != deleteCommand:
secret_values = input("Comma separated list of secret values: ").split(',')
validate_action(action, createCommand, updateCommand, deleteCommand, secret_names, secret_values)
if "y" in input("Limit tool to a github team? (y/n)").lower():
target_team_name = input("Team name: ")
else:
target_team_name = ""
if "y" in input("Limit tool to a specific repo? (y/n)").lower():
target_repo_name = input("Repo name: ")
else:
target_repo_name = ""
interactive = "y" in input("Prompt for approval before applying action to each repo? (y/n)").lower()
return UserInput(token, action, secret_names, secret_values, target_team_name, target_repo_name, interactive)
def get_input_from_cli():
token = get_mandatory_value_from_input(args, tokenCommand, noTokenMessage)
secret_names = get_mandatory_value_from_input(args, namesCommand, noNamesMessage).split(',')
secret_values = get_optional_value_from_input(args, valuesCommand).split(',')
target_team_name = get_optional_value_from_input(args, teamCommand)
target_repo_name = get_optional_value_from_input(args, repoCommand)
dependabotSecretsSync = get_optional_value_from_input(args, dependabotSecretsSyncOption)
repoOwner = get_optional_value_from_input(args, repositoryOwner)
interactive = interactiveCommand in args
action = validate_action(args[0], createCommand, updateCommand, deleteCommand, secret_names, secret_values)
return UserInput(token, action, secret_names, secret_values, target_team_name, target_repo_name, dependabotSecretsSync, repoOwner, interactive)
def flatten_secrets_dict(dict_of_secrets):
list_of_secrets = []
for secret in dict_of_secrets:
list_of_secrets.append(secret["name"])
return list_of_secrets
def get_repo_public_key(token, repo_owner, repo_name):
query_url = f"https://github.com/gitapi/repos/{repo_owner}/{repo_name}/dependabot/secrets/public-key"
headers = {'Authorization': f'token {token}'}
r = requests.get(query_url, headers=headers)
response = r.json()
return response["key_id"], response["key"]
def encrypt(public_key: str, secret_value: str) -> str:
"""Encrypt a Unicode string using the public key."""
public_key = public.PublicKey(public_key.encode("utf-8"), encoding.Base64Encoder())
sealed_box = public.SealedBox(public_key)
encrypted = sealed_box.encrypt(secret_value.encode("utf-8"))
return b64encode(encrypted).decode("utf-8")
def add_secret(token, target_repository, secret_name, secret_value):
repo_full_name = target_repository.full_name
repo_name = target_repository.name
query_url = f"https://github.com/gitapi/repos/{repo_full_name}/actions/secrets"
headers = {'Authorization': f'token {token}'}
r = requests.get(query_url, headers=headers)
response = r.json()
try:
secret_names = flatten_secrets_dict(response["secrets"])
except:
secret_names = []
if secret_name not in secret_names:
print(f"Secret \"{secret_name}\" added to {repo_name}")
target_repository.create_secret(secret_name, secret_value)
else:
print(f"Secret \"{secret_name}\" already exists in {repo_name}")
def add_dependabot_secret(token, target_repository, secret_name, secret_value, repoOwner):
repo_name = target_repository.name
repo_owner = repoOwner
key_id, key = get_repo_public_key(token, repo_owner, repo_name)
query_url = f"https://github.com/gitapi/repos/{repo_owner}/{repo_name}/dependabot/secrets"
headers = {'Authorization': f'token {token}'}
r = requests.get(query_url, headers=headers)
response = r.json()
try:
secret_names = flatten_secrets_dict(response["secrets"])
except:
secret_names = []
if secret_name not in secret_names:
# put call add repo secrets to dependabot secrets
url = f"https://github.com/gitapi/repos/{repo_owner}/{repo_name}/dependabot/secrets/{secret_name}"
data = {
"encrypted_value": encrypt(key, secret_value),
"key_id": key_id
}
response = requests.put(url, headers=headers, data=json.dumps(data))
print(f"Response Code: {response.status_code}")
if response.status_code == 201:
print(f"dependabot Secret \"{secret_name}\" added to {repo_name}")
else:
print(f"dependabot Secret \"{secret_name}\" could NOT be added to {repo_name}")
else:
print(f"dependabot Secret \"{secret_name}\" already exists in {repo_name}")
def update_dependabot_secret(token, target_repository, secret_name, secret_value, repoOwner):
repo_name = target_repository.name
repo_owner = repoOwner
key_id, key = get_repo_public_key(token, repo_owner, repo_name)
query_url = f"https://github.com/gitapi/repos/{repo_owner}/{repo_name}/dependabot/secrets"
headers = {'Authorization': f'token {token}'}
r = requests.get(query_url, headers=headers)
response = r.json()
try:
secret_names = flatten_secrets_dict(response["secrets"])
except:
secret_names = []
if secret_name not in secret_names:
# patch call update repo secrets to dependabot secrets
url = f"https://github.com/gitapi/repos/{repo_owner}/{repo_name}/dependabot/secrets/{secret_name}"
data = {
"encrypted_value": encrypt(key, secret_value),
"key_id": key_id
}
response = requests.patch(url, headers=headers, data=json.dumps(data))
print(f"Response Code: {response.status_code}")
if response.status_code == 204:
print(f"dependabot Secret \"{secret_name}\" updated in {repo_name}")
else:
print(f"dependabot Secret \"{secret_name}\" could NOT be updated in {repo_name}")
def delete_dependabot_secret(token, target_repository, secret_name, repoOwner):
repo_name = target_repository.name
repo_owner = repoOwner
headers = {'Authorization': f'token {token}'}
# put call add repo secrets to dependabot secrets
url = f"https://github.com/gitapi/repos/{repo_owner}/{repo_name}/dependabot/secrets/{secret_name}"
response = requests.delete(url, headers=headers)
print(f"Response Code: {response.status_code}")
if response.status_code == 204:
print(f"dependabot Secret \"{secret_name}\" deleted from {repo_name}")
else:
print(f"dependabot Secret \"{secret_name}\" could NOT be deleted from {repo_name}")
if __name__ == "__main__":
if len(args) == 0:
inp = get_input_from_user()
else:
inp = get_input_from_cli()
print(inp.repoOwner)
g = get_github_user(inp.token, invalidTokenMessage)
if inp.target_team_name != "":
for team in g.get_user().get_teams(): # There is no method to get a team by name
if team.name == inp.target_team_name:
source = team
if source is None:
raise ValueError(noTeamMessage)
else:
source = g.get_user()
for repo in source.get_repos():
if(inp.target_repo_name != "" and repo.name != inp.target_repo_name):
continue
for i in range(len(inp.secret_names)):
if len(inp.dependabotSecretsSync) == 0 or inp.dependabotSecretsSync.lower() == "yes" or inp.dependabotSecretsSync.lower() == '':
if not inp.interactive or apply_action(repo.name):
try:
if inp.action == createCommand:
add_secret(inp.token, repo, inp.secret_names[i], inp.secret_values[i])
if not inp.repoOwner == defaultRepoOwnerName:
add_dependabot_secret(inp.token, repo, inp.secret_names[i], inp.secret_values[i], inp.repoOwner)
else:
raise ValueError(invalidRepoOwnerMessage)
if inp.action == updateCommand:
c = repo.get_contributors()
repo.create_secret(inp.secret_names[i], inp.secret_values[i])
print(f"Secret \"{inp.secret_names[i]}\" updated for {repo.name}")
if not inp.repoOwner == defaultRepoOwnerName:
update_dependabot_secret(inp.token, repo, inp.secret_names[i], inp.secret_values[i], inp.repoOwner)
else:
raise ValueError(invalidRepoOwnerMessage)
if inp.action == deleteCommand:
repo.delete_secret(inp.secret_names[i])
print(f"Secret \"{inp.secret_names[i]}\" removed from {repo.name}")
if not inp.repoOwner == defaultRepoOwnerName:
delete_dependabot_secret(inp.token, repo, inp.secret_names[i], inp.repoOwner)
else:
raise ValueError(invalidRepoOwnerMessage)
except UnknownObjectException:
print(f"The provided token does not have permission to manage {repo.name}, it is being skipped")
else:
if not inp.interactive or apply_action(repo.name):
try:
if inp.action == createCommand:
add_secret(inp.token, repo, inp.secret_names[i], inp.secret_values[i])
if inp.action == updateCommand:
c = repo.get_contributors()
repo.create_secret(inp.secret_names[i], inp.secret_values[i])
print(f"Secret \"{inp.secret_names[i]}\" updated for {repo.name}")
if inp.action == deleteCommand:
repo.delete_secret(inp.secret_names[i])
print(f"Secret \"{inp.secret_names[i]}\" removed from {repo.name}")
except UnknownObjectException:
print(f"The provided token does not have permission to manage {repo.name}, it is being skipped")