美文网首页
python批量下载gitlab项目 支持群组

python批量下载gitlab项目 支持群组

作者: 噫那里有条咸鱼 | 来源:发表于2022-05-09 18:26 被阅读0次
    #!/usr/bin/python3
    from urllib.request import urlopen
    import json
    import subprocess, shlex
    import time
    import os
    
    # 默认使用 http 拉取,非 ssh
    gitlabToken = 'kwxmuskShkUs-xxxxxxx'    # person token (ps: 只有第一次创建时可见,不是feed token)
    gitlabAddr  = 'xxxxxx.com'              # gitlab地址
    targets     = ['xxx','xxx']             # 所属群组 (ps: 为空克隆所有群组,慎用)
    withShared  = 'false'                   # 是否包含共享的项目,默认false (ps:为true会拉取到归属其他群组的项目)
    
    #-------------------------
    
    counter = 0
    procs = []
    
    def get_next(group_id):
        global counter
        global procs
    
        print('get_next group_id:', group_id)
        url = gen_next_url(group_id)
        allProjects     = urlopen(url)
        allProjectsDict = json.loads(allProjects.read().decode())
        if len(allProjectsDict) == 0:
            return
        for thisProject in allProjectsDict: 
            try:
                thisProjectURL  = thisProject['http_url_to_repo']
                thisProjectPath = thisProject['path_with_namespace']
                if os.path.exists(thisProjectPath):
                    command     = shlex.split('git -C "%s" pull' % (thisProjectPath))               
                else:
                    print("=========== 开始克隆 %s %s ===========" % (group_id, thisProject['name']))
                    print('执行:git clone %s %s' % (thisProjectURL, thisProjectPath))                        
                    command     = shlex.split('git clone %s %s' % (thisProjectURL, thisProjectPath))
                    
                proc  = subprocess.Popen(command)
                procs.append(proc)
                time.sleep(1)
                counter += 1
    
            except Exception as e:
                print("Error on %s: %s" % (thisProjectURL, e.strerror))
    
        print("=========== 等待子线程执行结束 ===========")
        for p in procs:
            p.wait()
            p.kill()
        print("=========== 子线程执行结束 ===========")
        return
    
    def have_next_projects(group_id):
        url = gen_next_url(group_id)
        allProjects     = urlopen(url)
        allProjectsDict = json.loads(allProjects.read().decode())
        if len(allProjectsDict) == 0:
            return False
        return True
    
    
    def get_sub_groups(parent_id):
        url = gen_subgroups_url(parent_id)
        allProjects     = urlopen(url)
        allProjectsDict = json.loads(allProjects.read().decode())
        sub_ids = []
        if len(allProjectsDict) == 0:
            return sub_ids
        for thisProject in allProjectsDict: 
            try:
                id = thisProject['id']
                sub_ids.append(id)
            except Exception as e:
                print("Error on %s: %s" % (id, e.strerror))
        return sub_ids
    
    def cal_next_sub_groupids(parent_id):
        parent = ''
        parent = parent_id
        
        is_start = 1
        parent_list = []
        sub_ids = get_sub_groups(parent_id)
        print('cal_next_sub_groupids sub_ids and parent_id:',sub_ids, parent_id)
        ok = have_next_projects(parent_id) 
        print('have_next_projects result:', ok)
        if len(sub_ids)!=0 and ok == False:
            for i in range(len(sub_ids)):
                print('cal_next_sub_groupids sub_ids[i]:', sub_ids[i])
                parent = sub_ids[i]
                a = cal_next_sub_groupids(sub_ids[i])
                return a
        if len(sub_ids) !=0 and ok == True:
            for i in range(len(sub_ids)):
                print('cal_next_sub_groupids parent:', parent)
                parent = sub_ids[i]
                parent_list.append(sub_ids[i])
                a = cal_next_sub_groupids(sub_ids[i])
                parent_list.extend(a)
        if len(sub_ids) == 0 and ok == True:
            print('cal_next_sub_groupids is_start:',is_start)
            parent_list.append(parent)
            return parent_list
        if len(sub_ids) ==0 and ok == False:
            return parent_list
        return parent_list
    
    def download_code(parent_id):
        data =cal_next_sub_groupids(parent_id)
        print('download_code result: ',data)
        for group_id in data:
            get_next(group_id)
        return
     
    def gen_next_url(target_id):
        return "https://%s/api/v4/groups/%s/projects?private_token=%s&with_shared=%s&order_by=updated_at" % (gitlabAddr, target_id, gitlabToken, withShared)
    
    def gen_subgroups_url(target_id):
        return "https://%s/api/v4/groups/%s/subgroups?private_token=%s" % (gitlabAddr, target_id, gitlabToken)
    
    def gen_global_url():
        return "http://%s/api/v4/projects?private_token=%s" % (gitlabAddr, gitlabToken)
    
    def download_global_code():
        global counter
        global procs
    
        url = gen_global_url()
        allProjects     = urlopen(url)
        allProjectsDict = json.loads(allProjects.read().decode())
        if len(allProjectsDict) == 0:
            return
        for thisProject in allProjectsDict: 
            try:
                thisProjectURL  = thisProject['http_url_to_repo']
                thisProjectPath = thisProject['path_with_namespace']
                print(thisProjectURL + ' ' + thisProjectPath)
                
                if os.path.exists(thisProjectPath):       
                    command     = shlex.split('git -C "%s" pull' % (thisProjectPath))
                else:
                    print("=========== 开始克隆 %s %s ===========" % (group_id, thisProject['name']))
                    print('执行:git clone %s %s' % (thisProjectURL, thisProjectPath))    
                    command     = shlex.split('git clone %s %s' % (thisProjectURL, thisProjectPath))
    
                proc  = subprocess.Popen(command)
                procs.append(proc)
                time.sleep(1)
                counter += 1
                
            except Exception as e:
                print("Error on %s: %s" % (thisProjectURL, e.strerror))
    
        print("=========== 等待子线程执行结束 ===========")
        for p in procs:
            p.wait()
            p.kill()
        print("=========== 子线程执行结束 ===========")
        return
    
    def download_targets_code():
        for target in targets:
            url     = "https://%s/api/v4/groups?private_token=%s&search=%s" % (gitlabAddr, gitlabToken, target)
            allProjects     = urlopen(url)
            allProjectsDict = json.loads(allProjects.read().decode())
            if len(allProjectsDict) == 0:
                return
            target_id = '' 
            for thisProject in allProjectsDict: 
                try:
                    this_name = thisProject['name']
                    if target == this_name:
                        target_id = thisProject['id']
                        break
                except Exception as e:
                    print("Error on %s: %s" % (this_name, e.strerror))    
            download_code(target_id)
        return
    
    def main():
        if len(targets) == 0:
            download_global_code()
        else:
            download_targets_code()
        
        print("=========== 执行结束,克隆项目数: %s ===========" % (counter))
        return
    
    
    main()
    

    相关文章

      网友评论

          本文标题:python批量下载gitlab项目 支持群组

          本文链接:https://www.haomeiwen.com/subject/isohurtx.html