Files: 1dcfe4fd1aed14f6f72db8cc6c2e11b82be39c04 / verify-merge.py
9538 bytesRaw
1 | #!/usr/bin/env python3 |
2 | import argparse |
3 | import os |
4 | import subprocess |
5 | import glob |
6 | import sys |
7 | |
8 | GIT = os.getenv('GIT', 'git') |
9 | GPG = os.getenv('GPG', 'gpg') |
10 | GITIAN_PUBKEYS_DIR = os.getenv('GITIAN_PUBKEYS_DIR', 'gitian-pubkeys') |
11 | |
12 | def verify(): |
13 | global args, workdir |
14 | if args.import_keys: |
15 | import_gpg_keys() |
16 | if args.refresh_keys: |
17 | refresh_gpg_keys() |
18 | # Shell glob pattern for specific version or all builds: |
19 | ver_pattern = args.version if args.version else 'v0*' |
20 | sig_file_paths = set(glob.glob(ver_pattern + '-*/*/*.assert.sig')) |
21 | assert_files = get_assert_file_list(ver_pattern) |
22 | user_names = get_user_names_from_keys() |
23 | verify_file_path_naming(assert_files, sig_file_paths, user_names) |
24 | verify_gpg_sigs(sig_file_paths) |
25 | verify_checksums(assert_files) |
26 | print('All checks passed.') |
27 | os.chdir(workdir) |
28 | |
29 | def main(): |
30 | global args, workdir |
31 | args = get_parsed_args() |
32 | workdir = os.getcwd() |
33 | if args.pull_id != None: |
34 | pull_request() |
35 | else: |
36 | verify() |
37 | |
38 | def get_parsed_args(): |
39 | parser = argparse.ArgumentParser(usage='%(prog)s [options]', description='Use this script to verify the signatures of existing gitian assert files and / or assert files in a specific pull request.') |
40 | parser.add_argument('-p', '--pull_id', dest='pull_id', help='GitHub Pull request id to check') |
41 | parser.add_argument('-r', '--remote', dest='remote', default='upstream', help='The git remote repository') |
42 | parser.add_argument('-t', '--target-branch', dest='target_branch', default='master', help='Remote repository merge into branch') |
43 | parser.add_argument('-m', '--merge', action='store_true', dest='merge', help='Merge the given pull request id') |
44 | parser.add_argument('-k', '--refresh-keys', action='store_true', dest='refresh_keys', help='Refresh all public keys that are currently in the gpg keyring.') |
45 | parser.add_argument('-i', '--import-keys', action='store_true', dest='import_keys', help='Import all public keys in the gitian-pubkeys directory to the gpg keyring.') |
46 | parser.add_argument('-o', '--no-verify', action='store_true', dest='no_verify', help='Do not run any signature verification') |
47 | parser.add_argument('-v', '--version', dest='version', help='Version number of sigs to be verified (defaults to all versions if not specified).') |
48 | return parser.parse_args() |
49 | |
50 | def pull_request(): |
51 | global args |
52 | # Get branch from remote pull request and compare |
53 | head_branch = args.pull_id + '_head' |
54 | subprocess.check_call([GIT, 'fetch', args.remote]) |
55 | subprocess.check_call([GIT, 'checkout', args.remote + '/' + args.target_branch]) |
56 | subprocess.check_call([GIT, 'fetch', '-q', args.remote, 'pull/' + args.pull_id + '/head:' + head_branch]) |
57 | subprocess.check_call([GIT, 'checkout', '-f', head_branch]) |
58 | if args.merge: |
59 | # Hard reset the target branch to the remote's state and merge the pull request's head branch into it |
60 | subprocess.check_call([GIT, 'checkout', args.target_branch]) |
61 | subprocess.check_call([GIT, 'reset', '--hard', args.remote + '/' + args.target_branch]) |
62 | print('Merging and signing pull request #' + args.pull_id + ' , if you are using a smartcard, confirm the signature now.') |
63 | subprocess.check_call([GIT, 'merge', '-q', '--commit', '--no-edit', '-m', 'Merge pull request #' + args.pull_id + ' into ' + args.target_branch, '--no-ff', '--gpg-sign', head_branch]) |
64 | if not args.no_verify: |
65 | verify() |
66 | subprocess.check_call([GIT, 'checkout', 'master']) |
67 | subprocess.check_call([GIT, 'branch', '-D', head_branch]) |
68 | |
69 | def refresh_gpg_keys(): |
70 | print('Refreshing pubkeys...') |
71 | subprocess.check_call([GPG, '--refresh']) |
72 | |
73 | def import_gpg_keys(): |
74 | os.chdir(GITIAN_PUBKEYS_DIR) |
75 | print('Importing gpg pubkeys...') |
76 | keys = glob.glob('*.asc') |
77 | for key in keys: |
78 | subprocess.check_call([GPG, '--import', key]) |
79 | os.chdir('../') |
80 | |
81 | def get_assert_file_list(ver_pattern): |
82 | assert_files = [] |
83 | for assert_file in sorted(glob.glob(ver_pattern + '-*/*/*.assert')): |
84 | pieces = assert_file.split('/') |
85 | release_full = pieces[0] # eg v0.15.0.1-linux |
86 | release_num, platform = release_full.split('-') |
87 | version_major = release_num.split('.')[1] |
88 | assert_files.append({ |
89 | 'release_full': release_full, |
90 | 'release_num': release_num, |
91 | 'platform': platform, |
92 | 'path': assert_file, |
93 | 'user': pieces[1], |
94 | 'version_major': version_major}) |
95 | return assert_files |
96 | |
97 | def verify_gpg_sigs(sig_file_paths): |
98 | print('Verifying signatures:') |
99 | is_verification_error = False |
100 | for sig_file in sig_file_paths: |
101 | print(' - ' + '{message: <{fill}}'.format(message=sig_file, fill='72'), end='') |
102 | result = verify_gpg_sig(sig_file) |
103 | if result.returncode != 0: |
104 | is_verification_error = True |
105 | print('\n') |
106 | sys.stderr.write('ERROR:\n' + result.stderr + '-' * 80 + '\n') |
107 | else: |
108 | print(' [OK]') |
109 | if is_verification_error: |
110 | sys.stderr.write('ERROR: One or more signatures failed verification.\n') |
111 | exit(1) |
112 | print('All signatures verified correctly.\n') |
113 | |
114 | def verify_file_path_naming(assert_files, sig_file_paths, user_names): |
115 | path_pattern = '{release_num}-{platform}/{user}/monero-{platform}-0.{version_major}-build.assert' |
116 | print('Verifying file path naming...') |
117 | # Check that every sig has an assert: |
118 | if len(sig_file_paths) > len(assert_files): |
119 | sys.stderr.write("ERROR: One or more sig files doesn't have a matching assert file:\n") |
120 | assert_file_paths = [a['path'] for a in assert_files] |
121 | extra_sigs = [s for s in sig_file_paths if os.path.splitext(s)[0] not in assert_file_paths] |
122 | for extra_sig in extra_sigs: |
123 | sys.stderr.write(" - {0}\n".format(extra_sig)) |
124 | exit(1) |
125 | for assert_file in assert_files: |
126 | # Check assert file has a sig file: |
127 | if (assert_file['path'] + '.sig') not in sig_file_paths: |
128 | sys.stderr.write('ERROR: Assert file found without corresponding sig file:\n' + assert_file['path'] + '\n') |
129 | exit(1) |
130 | # Check assert user corresponds with a known GPG pubkey: |
131 | if assert_file['user'] not in user_names: |
132 | sys.stderr.write("ERROR: User '{user}' doesn't have a matching PGP key. Expected {folder}/{user}.asc\n".format(user=assert_file['user'], folder=GITIAN_PUBKEYS_DIR)) |
133 | sys.stderr.write(" * Found in path: {path}\n".format(path=assert_file['path'])) |
134 | exit(1) |
135 | # Check overall format of path (version num, platform, folder and file names): |
136 | expected_path = path_pattern.format(**assert_file) |
137 | if expected_path != assert_file['path']: |
138 | sys.stderr.write('ERROR: File path appears to be incorrect:\n{actual}\nExpected:\n{expected}\n'.format(actual=assert_file['path'], expected=expected_path)) |
139 | exit(1) |
140 | print('All file paths seem to be correct.\n') |
141 | |
142 | def get_user_names_from_keys(): |
143 | os.chdir(GITIAN_PUBKEYS_DIR) |
144 | user_names = [os.path.splitext(key)[0] for key in glob.glob('*.asc')] |
145 | os.chdir('../') |
146 | return user_names |
147 | |
148 | def verify_gpg_sig(sig_file): |
149 | # TODO: Verify correct user created the signature. |
150 | return subprocess.run([GPG, '--verify', sig_file], capture_output=True, encoding='utf-8') |
151 | |
152 | def verify_checksums(assert_files): |
153 | print('Beginning binary checksum comparison...\n') |
154 | # Check that the contents between the assertion signers match. |
155 | # This is meant for quick verification, not for validation of their contents. |
156 | # TODO: prevent false positives related to filenames / whitespace / formatting. |
157 | prev_release_num = '' |
158 | prev_release_full = '' |
159 | prev_platform = '' |
160 | for assert_file in assert_files: |
161 | release_full = assert_file['release_full'] |
162 | if release_full != prev_release_full: |
163 | first_user = assert_file['user'] |
164 | first_file = assert_file['path'] |
165 | prev_release_full = release_full |
166 | if prev_release_num != assert_file['release_num']: |
167 | print(' ' + assert_file['release_num']) |
168 | prev_release_num = assert_file['release_num'] |
169 | f = open(first_file, 'r') |
170 | first_file_contents = f.readlines() |
171 | f.close() |
172 | continue |
173 | platform = assert_file['platform'] |
174 | if platform != prev_platform: |
175 | prev_platform = platform |
176 | print(' ' + platform) |
177 | print(' ' + first_user) |
178 | print(' ' + assert_file['user']) |
179 | assert_file_handle = open(assert_file['path'], 'r') |
180 | assert_file_contents = assert_file_handle.readlines() |
181 | assert_file_handle.close() |
182 | for i in range(len(assert_file_contents)): |
183 | # Compare each line in the assertion file until base_manifests: |
184 | if assert_file_contents[i] == '- base_manifests: !!omap\n': |
185 | break |
186 | # The OSX SDK may change from time to time: |
187 | if 'sdk' in assert_file_contents[i]: |
188 | continue |
189 | if assert_file_contents[i] != first_file_contents[i]: |
190 | sys.stderr.write('ERROR: Found conflicting contents on line: ' + str(i) + ' of file ') |
191 | sys.stderr.write(assert_file['path'] + ':\n' + assert_file_contents[i]) |
192 | sys.stderr.write(first_file + ':\n' + first_file_contents[i]) |
193 | exit(1) |
194 | print('No discrepancies found in assertion files.') |
195 | |
196 | if __name__ == '__main__': |
197 | main() |
198 |
Built with git-ssb-web