forked from chrisschaaf/github-org-find-replace
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
148 lines (120 loc) · 4.66 KB
/
main.py
File metadata and controls
148 lines (120 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os
import github
import click
import re
class Updater:
def __init__(self, repo, paths):
self.repo = repo
self.paths = paths
self.old_contents = {}
self.old_shas = {} # store the shas for the original files, (for use when doing update_file)
self.new_contents = {}
def get_old_contents(self):
for p in self.paths:
f = self.repo.get_contents(p)
self.old_contents[p] = f.decoded_content.decode("utf-8")
self.old_shas[p] = f.sha
def find_replace(self, find, replace):
for p, old_content in self.old_contents.items():
new_content = re.sub("[a-z\\/-a-z]*@v1[-a-z]*[0-9]*",replace, old_content)
if new_content == old_content:
print("(no change in content)")
continue
self.new_contents[p] = new_content
print(f"Old contents of {self.repo} {p}\n---------")
print(old_content)
print("---------")
print(f"New contents of {self.repo} {p}\n---------")
print(new_content)
print("---------")
def create_pr(self, message, branch_name, labels):
if not self.new_contents:
print("no new contents to use")
return
ref = self.repo.create_git_ref(
f"refs/heads/{branch_name}",
self.repo.get_commits()[0].sha,
)
for p, new_content in self.new_contents.items():
old_file_sha = self.repo.get_contents(p, ref.ref).sha
self.repo.update_file(
p,
message,
new_content,
old_file_sha,
branch_name,
)
pull = self.repo.create_pull(
message,
"",
self.repo.default_branch,
branch_name,
)
if labels:
pull.add_to_labels(*labels)
print(pull.html_url)
@click.command()
@click.option('-h', '--ghe-hostname', type=str, default="", help='Github Enterprise Hostname, example: \'github.company.com\'')
@click.option('-o', '--organization', type=str, required=True)
@click.option('-f', '--find', type=str, required=True)
@click.option('-r', '--replace', type=str, required=True)
@click.option('-e', '--extra-search-params', type=str, default="")
@click.option('-i', '--ignore-existing-branch', is_flag=True, default=False)
@click.pass_context
def cli(
ctx,
ghe_hostname,
organization,
find,
replace,
extra_search_params,
ignore_existing_branch
):
# allow \n to be given on the command line user input
# find = find.replace('\\n', '\n')
# replace = replace.replace('\\n', '\n')
if not ghe_hostname:
gh = github.Github(os.environ["GITHUB_API_TOKEN"])
else:
gh = github.Github(base_url=f"https://{ghe_hostname}/api/v3", login_or_token=os.environ["GITHUB_API_TOKEN"])
query = f"org:{organization} {extra_search_params} in:file '{find}'"
results = gh.search_code(query)
if not results.totalCount:
click.echo(f"No results found for:\n{query}")
return
repo_objs_by_name = {result.repository.full_name: result.repository for result in results if not result.repository.archived}
repo_files = {}
for result in results:
if result.repository.archived:
print(f"skipping archived repo {result.repository}")
continue
files_in_repo = repo_files.get(result.repository.full_name, [])
files_in_repo.append(result.path)
repo_files[result.repository.full_name] = files_in_repo
updaters = [Updater(repo, repo_files[name]) for name, repo in repo_objs_by_name.items()]
click.secho(f"Found matches in {len(repo_files)} repos.")
if not click.confirm("See potential changes?"):
return
for u in updaters:
click.secho(str(u.repo), fg="magenta")
u.get_old_contents()
u.find_replace(find, replace)
if not click.confirm("Ready to send these as PRs? We'll get some more information first."):
return
title = click.prompt("Commit message / PR title")
branch_name = click.prompt("Branch name")
labels_in_commas = click.prompt("PR labels", default="")
labels = []
if labels_in_commas:
labels = [x.strip() for x in labels_in_commas.split(",")]
for u in updaters:
click.secho(str(u.repo), fg="magenta")
try:
u.create_pr(title, branch_name, labels)
except github.GithubException as err:
if ignore_existing_branch and err.status == 422:
print(f"Branch already exists on {u.repo}, ignoring it")
else:
raise err
if __name__ == "__main__":
cli()