-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathrepository.py
158 lines (138 loc) · 6.11 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import requests
from datetime import datetime
import utils
import os
import settingsTools
from pathlib import Path
settings = settingsTools.settings
locales = settingsTools.locales
class Version(object):
def __init__(self, content):
lines = content.splitlines()
self.version = lines[0].replace("\n", "") if len(lines) >= 1 else None
self.branch = lines[1].replace("\n", "") if len(lines) >= 2 else None
self.commit_hash = lines[2].replace("\n", "") if len(lines) >= 3 and lines[2].replace("\n", "") != "" else None
self.version_file = None
def write_commit_hash(self, hash):
if self.version_file is not None:
with open(self.version_file) as fr:
lines = fr.readlines()
if len(lines) >= 3:
lines[2] = f"{hash}\n"
else:
lines.append(f"{hash}\n")
with open(self.version_file, "w") as fw:
fw.writelines(lines)
@classmethod
def get_version_file(cls):
for file in Path(".").rglob("version.txt"):
strpath = str(file)
with open(strpath) as f:
cls = cls(f.read())
cls.version_file = strpath
return cls
else:
return cls("")
class Repository(object):
# How to deal with api rate limiting:
# Create a ws server that clones repository on any changes
# (bother Ratto to get awesome webhook or clone repo every 5 seconds)
# Implement api for it and access it like it is github api
def __init__(self, name):
self.name = name
self.cache = {}
def get_download_url(self, branch, path):
return f"https://raw.githubusercontent.com/{self.name}/{branch}/{path}"
@property
def repository_name(self):
return self.name.split("/")[1]
def get_tree(self, branch) -> dict:
self.cache["tree"] = {}
tmp_tree = {}
request = requests.get(f"https://api.github.com/repos/{self.name}/git/trees/{branch}?recursive=true")
if request.status_code == 200:
r = request.json()
self.__verify_request(r)
for i in r["tree"]:
tmp_tree[i["path"]] = i
self.cache["tree"][branch] = tmp_tree
return tmp_tree
else:
return None
def get_size(self, branch):
size = 0
cacheres = self.get_cache("tree", branch)
tree = cacheres if cacheres is not None else self.get_tree(branch)
for i in tree.values():
tmp_size = i.get("size")
size += tmp_size if tmp_size is not None else 0
return size # Uncompressed
def compare_tree(self, branch):
tree = self.get_tree(branch)
if tree is not None:
for file, i in tree.items():
if not os.path.exists(file):
if i["type"] == "tree":
utils.mkdirs(file)
else:
locales.adv_print("FILE_IS_MISSING", variables={"file": file})
utils.download_file_with_bar(self.get_download_url(branch, file), file)
def get_cache(self, *args):
last_arg = None
for counter, i in enumerate(args):
getres = self.cache.get(i) if last_arg is None else last_arg.get(i)
if type(getres) != dict or counter == len(args) - 1:
return getres
else:
last_arg = getres
def get_commit_info(self, commit):
commit_info = requests.get(f"https://api.github.com/repos/{self.name}/commits/{commit}").json()
self.__verify_request(commit_info)
return commit_info
def get_branches(self):
tmp_branches = {}
branches = requests.get(f"https://api.github.com/repos/{self.name}/branches").json()
self.__verify_request(branches)
self.cache["branches"] = {}
for branch in branches:
commit = branch["commit"]["sha"]
commit_info = self.get_commit_info(commit)
commit_datetime = datetime.strptime(commit_info["commit"]["author"]["date"], "%Y-%m-%dT%H:%M:%S%z").replace(
tzinfo=None)
delta = datetime.now() - commit_datetime
tmp_branches[
f"{branch['name']} [{locales.message('DAYS_AGO', {'days': delta.days})}]"] = commit_info
self.cache["branches"][branch['name']] = commit_info
return tmp_branches
def get_version(self, branch):
request = requests.get(f"https://raw.githubusercontent.com/{self.name}/{branch}/version.txt")
return Version(request.text) if request.status_code == 200 else None
def get_latest_commit_hash(self, branch):
if (cacheres := self.get_cache("branches", branch)) is None:
request = requests.get(f"https://api.github.com/repos/{self.name}/branches/{branch}")
if request.status_code == 200:
r = request.json()
self.__verify_request(r)
return r["commit"]["sha"]
else:
return None
else:
return cacheres["sha"]
def clone(self, branch):
version = self.get_version(branch).version
expected_path = f"{self.repository_name} {version}"
utils.download_file_and_extract(f"https://github.com/{self.name}/archive/{branch}.zip", f"{branch}.zip",
self.get_size(branch))
if os.path.exists(expected_path):
utils.rmtree(expected_path)
os.rename(f"{self.repository_name}-{branch}", expected_path)
def diff_commits(self, base, head):
r = requests.get(f"https://api.github.com/repos/{self.name}/compare/{base}...{head}").json()
self.__verify_request(r)
show_commits = settings["show_last_X_commits"]
locales.adv_print("COMMIT_DIFF_RESULTS", variables={"ahead_commits": r["ahead_by"], "last_count": show_commits})
for commit in r["commits"][-show_commits::]:
print(f"[+] {commit['commit']['message']}")
def __verify_request(self, r):
if type(r) == dict and r.get("message") is not None:
raise Exception(r.get("message"))