From 96e76dfc40a25a9a0dd6a4931ec630c1a353ab35 Mon Sep 17 00:00:00 2001 From: Jerry Date: Fri, 6 Sep 2019 14:59:45 +0800 Subject: [PATCH] repod: rewrite push --- buildbot.py | 102 +++++++++++++++++++++++++++++------------- repod.py | 124 ++++++++++++++++++++++++++++++---------------------- 2 files changed, 144 insertions(+), 82 deletions(-) diff --git a/buildbot.py b/buildbot.py index 2d45051..a864514 100755 --- a/buildbot.py +++ b/buildbot.py @@ -87,7 +87,7 @@ class jobsManager: ret += f'{myproperty}={getattr(self, myproperty, None)},' ret += ')' return ret - def reset_dir(self, pkgdirname=None, all=False): + def reset_dir(self, pkgdirname=None, all=False, rmpkg=True): if all: logger.info('resetting %s', str(REPO_ROOT)) bash(GIT_RESET_SUBDIR, cwd=REPO_ROOT) @@ -102,16 +102,36 @@ class jobsManager: if fpath.is_dir() and \ fpath.name in ('pkg', 'src'): rmtree(fpath) - elif fpath.is_file() and \ + elif rmpkg and fpath.is_file() and \ (fpath.name.endswith(PKG_SUFFIX) or \ fpath.name.endswith(PKG_SIG_SUFFIX)): fpath.unlink() else: return False return True - def force_upload_package(self, pkgdirname, overwrite=True): - pass - def rebuild_package(self, pkgdirname, clean=False): + def force_upload_package(self, pkgdirname, overwrite=False): + if not self.idle: + logger.debug('force_upload requested and not idle.') + if not (REPO_ROOT / pkgdirname).exists(): + ret = f'force_upload failed: no such dir {pkgdirname}' + logger.warning(ret) + else: + self.pkgconfigs = load_all_yaml() + updates = updmgr.check_update(rebuild_package=pkgdirname) + if updates and len(updates) == 1: + (pkgconfig, ver, buildarchs) = updates[0] + fakejob = Job(buildarchs[0], pkgconfig, ver) + if self.__upload(fakejob, overwrite=overwrite): + ret = f'done force_upload {pkgdirname}' + logger.info(ret) + else: + ret = f'force_upload {pkgdirname} failed: return code.' + logger.warning(ret) + else: + ret = f'force_upload {pkgdirname} failed: cannot check update.' + logger.warning(ret) + return ret + def rebuild_package(self, pkgdirname, clean=True): if not self.idle: logger.debug('rebuild requested and not idle.') self.pkgconfigs = load_all_yaml() @@ -130,7 +150,7 @@ class jobsManager: ret = f'rebuild job added for {pkgdirname} {" ".join(buildarchs)}' logger.info(ret) else: - ret = 'rebuild failed: cannot check update.' + ret = f'rebuild {pkgdirname} failed: cannot check update.' logger.warning(ret) return ret def _new_buildjob(self, job): @@ -228,10 +248,10 @@ class jobsManager: for fpath in cwd.iterdir(): if fpath.name.endswith(PKG_SUFFIX): bash(f'{GPG_SIGN_CMD} {fpath.name}', cwd=cwd) - def __upload(self, job): - suc = True + def __upload(self, job, overwrite=False): cwd = REPO_ROOT / job.pkgconfig.dirname f_to_upload = list() + pkg_update_list = list() for fpath in cwd.iterdir(): if fpath.name.endswith(PKG_SUFFIX) and \ get_pkg_details_from_name(fpath.name).ver == job.version: @@ -239,36 +259,33 @@ class jobsManager: assert sigpath.exists() f_to_upload.append(sigpath) f_to_upload.append(fpath) + pkg_update_list.append(fpath) + sizes = [f.stat().st_size / 1000 / 1000 for f in f_to_upload] + max_tries = 10 + for tries in range(max_tries): + timeouts = rrun('push_start', args=(f_to_upload, sizes)) + if type(timeouts) is list: + break + else: + if tries + 1 < max_tries: + logger.warning(f'Remote is busy ({timeouts}), wait 1 min x10 [{tries+1}/10]') + sleep(60) + else: + raise RuntimeError('Remote is busy and cannot connect') + assert len(f_to_upload) == len(timeouts) + pkgs_timeouts = {f_to_upload[i]:timeouts[i] for i in range(len(sizes))} for f in f_to_upload: max_tries = 5 for tries in range(max_tries): try: - size = f.stat().st_size / 1000 / 1000 - if f.name.endswith(PKG_SUFFIX): - for _ in range(10): - timeout = rrun('push_start', args=(f.name, size)) - if timeout > 0: - break - else: - logger.warning('Remote is busy (-1), wait 1 min x10') - sleep(60) - else: - timeout = 60 + timeout = pkgs_timeouts.get(f) logger.info(f'Uploading {f}, timeout in {timeout}s') mon_bash(UPLOAD_CMD.format(src=f), seconds=int(timeout)) - if f.name.endswith(PKG_SUFFIX): - logger.info(f'Requesting repo update for {f.name}') - res = rrun('push_done', args=(f.name,), kwargs={'overwrite': False,}) - if res is None: - logger.info(f'Update success for {f.name}') - else: - logger.error(f'Update failed for {f.name}, reason: {res}') - suc = False except Exception: time_to_sleep = (tries + 1) * 60 logger.error(f'We are getting problem uploading {f}, wait {time_to_sleep} secs') if not rrun('push_fail', args=(f.name,)): - logger.error('unable to run push_fail') + logger.error('Unable to run push_fail') print_exc_plus() if tries + 1 < max_tries: sleep(time_to_sleep) @@ -277,7 +294,31 @@ class jobsManager: else: logger.error(f'Upload {f} failed, abort.') raise RuntimeError('Unable to upload some files') - return suc + logger.info(f'Requesting repo update for {pkg_update_list}') + res = "unexpected" + max_tries = 5 + for tries in range(max_tries): + try: + res = rrun('push_done', args=(f_to_upload,), kwargs={'overwrite': overwrite,}) + except Exception: + time_to_sleep = (tries + 1) * 60 + logger.info(f'Error updating {pkg_update_list}, wait {time_to_sleep} secs') + print_exc_plus() + if tries + 1 < max_tries: + sleep(time_to_sleep) + else: + break + else: + ret = f'Update failed for {pkg_update_list}: max reties exceeded' + logger.error(ret) + raise RuntimeError(ret) + if res is None: + logger.info(f'Update success for {pkg_update_list}') + else: + ret = f'Update failed for {pkg_update_list}, reason: {res}' + logger.error(ret) + raise RuntimeError(ret) + return res is None def getup(self): ''' check for updates now !!! @@ -396,7 +437,8 @@ class updateManager: logger.info(f'checking update: {pkg.dirname}') if self.__pkgerrs.get(pkg.dirname, 0) >= 2: logger.warning(f'package: {pkg.dirname} too many failures checking update') - continue + if rebuild_package is None: + continue pkgbuild = pkgdir / 'PKGBUILD' archs = get_arch_from_pkgbuild(pkgbuild) buildarchs = [BUILD_ARCH_MAPPING.get(arch, None) for arch in archs] diff --git a/repod.py b/repod.py index 2a8ca04..05108c0 100755 --- a/repod.py +++ b/repod.py @@ -13,6 +13,7 @@ import os from config import REPOD_BIND_ADDRESS, REPOD_BIND_PASSWD, REPO_PUSH_BANDWIDTH, \ GPG_VERIFY_CMD +from shared_vars import PKG_SUFFIX, PKG_SIG_SUFFIX @@ -32,27 +33,34 @@ configure_logger(logger, logfile='repod.log', rotate_size=1024*1024*10, enable_n class pushFm: def __init__(self): - self.fname = None + self.fnames = list() self.size = None + self.sizes = None self.start_time = None self.end_time = None - def start(self, fname, size): + def start(self, fnames, sizes): ''' - size is in MB + sizes is list in MB returns -1 when busy ''' if self.is_busy(): return -1 - self.fname = fname + self.fnames = fnames self.start_time = time() + self.sizes = sizes + size = 0 + for s in sizes: + size += s self.size = size - if size <= 7.5: - timeout = 120 - self.end_time = self.start_time + 120 - else: - timeout = size / (REPO_PUSH_BANDWIDTH / 8) * 2 - self.end_time = self.start_time + timeout - return timeout + def get_timeout(size): + if size <= 7.5: + timeout = 120 + else: + timeout = size / (REPO_PUSH_BANDWIDTH / 8) * 2 + return timeout + timeouts = [get_timeout(s) for s in sizes] + self.end_time = self.start_time + get_timeout(self.size) + return timeouts def tick(self): ''' return None means success @@ -60,77 +68,89 @@ class pushFm: ''' if self.is_busy(): if time() > self.end_time: - ret = f'file {self.fname} is supposed to finish at {self.end_time}' + ret = f'files {self.fnames} are supposed to finish at {self.end_time}' self.__init__() - logger.error(f'pfm: {ret}') + logger.error(f'tick: {ret}') return ret else: return None else: return None - def fail(self, fname): + def fail(self, tfname): update_path = Path('updates') - if fname == self.fname: - pkg = update_path / self.fname - sig = update_path / f'{self.fname}.sig' - for f in (pkg, sig): - if f.exists(): - try: - f.unlink() - except Exception: - logger.warning(f'unable to remove {f.name}') + if tfname in self.fnames: + for fname in self.fnames: + pkg = update_path / fname + sig = update_path / f'{fname}.sig' + for f in (pkg, sig): + if f.exists(): + try: + f.unlink() + except Exception: + logger.warning(f'unable to remove {f.name}') self.__init__() return None else: return "Wrong file" - def done(self, fname, overwrite=False): + def done(self, fnames, overwrite=False): ''' return None means success else returns an error string ''' - if fname == self.fname: + if [f for f in fnames if not (f.endswith(PKG_SUFFIX) or f.endswith(PKG_SIG_SUFFIX))]: + return "file to upload are garbage" + filter_sig = lambda fnames:[fname for fname in fnames if not fname.endswith(PKG_SIG_SUFFIX)] + if sorted(filter_sig(fnames)) == sorted(filter_sig(self.fnames)): try: update_path = Path('updates') - pkg_found = False - sig_found = False - for fpath in update_path.iterdir(): - if fpath.is_dir(): - continue - if fpath.name == self.fname: - pkg_found = fpath - elif fpath.name == f'{self.fname}.sig': - sig_found = fpath - if pkg_found and sig_found: - try: - bash(f'{GPG_VERIFY_CMD} {sig_found} {pkg_found}') - except CalledProcessError: - print_exc_plus() - return 'GPG verify error' - else: + for pkgfname in filter_sig(fnames): + pkg_found = False + sig_found = False + for fpath in update_path.iterdir(): + if fpath.is_dir(): + continue + if fpath.name == pkgfname: + pkg_found = fpath + elif fpath.name == f'{pkgfname}.sig': + sig_found = fpath + if pkg_found and sig_found: try: - if update(overwrite=overwrite): - return None - except Exception: + bash(f'{GPG_VERIFY_CMD} {sig_found} {pkg_found}') + except CalledProcessError: + ret = f'{pkg_found} GPG verify error' + logger.error(ret) print_exc_plus() - return 'update error' + return ret + else: + try: + if update(overwrite=overwrite): + continue + else: + raise RuntimeError('update return false') + except Exception: + print_exc_plus() + return f'{pkg_found} update error' + else: + return f'file missing: pkg {pkg_found} sig {sig_found}' + return "unexpected error" else: - return f'file missing: pkg {pkg_found} sig {sig_found}' - return "unexpected error" + # success + return None finally: self.__init__() else: return "Wrong file" def is_busy(self): - return not (self.fname is None) + return bool(self.fnames) pfm = pushFm() -def push_start(filename, size): +def push_start(filenames, sizes): pfm.tick() - return pfm.start(filename, size) + return pfm.start(filenames, sizes) -def push_done(filename, overwrite=False): - return pfm.done(filename, overwrite=overwrite) +def push_done(filenames, overwrite=False): + return pfm.done(filenames, overwrite=overwrite) def push_fail(filename): return pfm.fail(filename)