Module continuous_delivery_scripts.utils.git_helpers
Utility script to abstract git operations for our CI scripts.
Expand source code
#
# Copyright (C) 2020-2025 Arm Limited or its affiliates and Contributors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
"""Utility script to abstract git operations for our CI scripts."""
import logging
import os
import re
import shutil
from git import Repo, Actor, GitCommandError
from packaging import version
from pathlib import Path
from typing import Optional, List, Union, Any, Tuple
from .configuration import configuration, ConfigurationVariable
from .filesystem_helpers import TemporaryDirectory
logger = logging.getLogger(__name__)
class GitWrapper:
"""Wrapper class to provide convenient methods for performing git actions."""
def __init__(self, path: Path, repo: Repo) -> None:
"""Creates an instance of GitWrapper.
Args:
path: path to repository.
repo: GitPython repository.
"""
self._root_path = path
self.repo = repo
self.author = Actor(
configuration.get_value(ConfigurationVariable.BOT_USERNAME),
configuration.get_value(ConfigurationVariable.BOT_EMAIL),
)
def _git_url_ssh_to_https(self, url: str) -> str:
"""Changes repository URL to use authorisation token.
Converts the git url to use the GitHub token:
See https://github.blog/2012-09-21-easier-builds-and-deployments-using-git-over-https-and-oauth/
Returns:
new URL
"""
path = url.split("github.com", 1)[1][1:].strip()
new = "https://{GITHUB_TOKEN}:x-oauth-basic@github.com/%s" % path
logger.info("rewriting git url to: %s" % new)
return new.format(GITHUB_TOKEN=configuration.get_value(ConfigurationVariable.GIT_TOKEN))
def clone(self, path: Path) -> "GitWrapper":
"""Clones this repository to the path.
Args:
path: path where to put the clone
Returns:
a wrapper over the cloned repository
"""
try:
git_clone = self.repo.clone_from(
url=self.get_remote_url(), to_path=str(path), multi_options=["--recurse-submodules"]
)
except GitCommandError as e:
logger.info("failed cloning repository: %s" % e)
logger.info("trying with authentication")
git_clone = self.repo.clone_from(
url=self._git_url_ssh_to_https(self.get_remote_url()),
to_path=str(path),
multi_options=["--recurse-submodules"],
)
clone = GitWrapper(path=path, repo=git_clone)
clone.set_remote_url(self.get_remote_url())
clone.fetch()
return clone
@property
def root(self) -> Path:
"""Gets repository root folder.
Returns:
repository root folder.
"""
return self._root_path
def configure_author(self) -> None:
"""Sets the author."""
self.repo.config_writer().set_value("user", "name", self.author.name).release()
self.repo.config_writer().set_value("user", "email", self.author.email).release()
def checkout_branch(self, branch_name: str) -> Any:
"""Checks out a branch from its name.
Args:
branch_name: name of the branch
Returns:
corresponding branch if found; None otherwise.
"""
logger.debug(f"Checking out {branch_name}")
local_branch = self.get_branch(branch_name)
if local_branch:
self.checkout(local_branch)
return local_branch
def checkout(self, branch: Any) -> None:
"""Checks out a branch.
Args:
branch: branch to check out
"""
self.repo.git.checkout(branch)
def _add_one_file_or_one_dir(self, path: Path) -> None:
if not path:
raise ValueError("Unspecified path.")
self._add_one_path(Path(path))
def _add_one_path(self, path_model: Path) -> None:
if not path_model.is_absolute():
path_model = Path(self.root).joinpath(path_model)
if not path_model.exists():
logger.warning(f"[Git] {path_model} cannot be added because not found.")
return
relative_path = str(path_model.relative_to(self.root))
unix_relative_path = relative_path.replace("\\", "/")
if path_model.is_dir():
unix_relative_path = f"{unix_relative_path}/*"
logger.info(f"Adding {unix_relative_path} to repository.")
self.repo.git.add(unix_relative_path)
def add(self, path: Union[list, set, Path]) -> None:
"""Adds a file or a list of files.
Args:
path: file path or list of file paths
"""
if isinstance(path, list) or isinstance(path, set):
for element in path:
self.add(element)
else:
self._add_one_file_or_one_dir(path)
def commit(self, message: str, **kwargs: Optional[Tuple[str, Any]]) -> None:
"""Commits changes to the repository.
Args:
message: commit message
**kwargs: extra parameters
"""
logger.info("Committing changes")
self.repo.index.commit(message, author=self.author, **kwargs)
def get_master_branch(self) -> Any:
"""Gets the `master` branch.
Returns:
corresponding branch
"""
main = configuration.get_value(ConfigurationVariable.MASTER_BRANCH)
branch = self.get_branch(main)
if branch:
return branch
return self.get_remote_branch(main)
def get_beta_branch(self) -> Any:
"""Gets the `beta` branch.
Returns:
corresponding branch
"""
beta = configuration.get_value(ConfigurationVariable.BETA_BRANCH)
branch = self.get_branch(beta)
if branch:
return branch
return self.get_remote_branch(beta)
def is_release_branch(self, branch_name: Optional[str]) -> bool:
"""Checks whether the branch is a `release` branch or not.
Args:
branch_name: name of the branch
Returns:
True if the branch is used for `release` code; False otherwise
"""
branch_pattern = configuration.get_value(ConfigurationVariable.RELEASE_BRANCH_PATTERN)
is_release, _ = self._is_branch_of_type(branch_name, branch_pattern)
return is_release
def fetch(self) -> None:
"""Fetches latest changes."""
self.repo.git.fetch(all=True, tags=True, force=True)
def get_branch(self, branch_name: str) -> Any:
"""Gets a specific local branch.
Args:
branch_name: name of the branch to look for
Returns:
corresponding branch or `None`
if no branches with this `branch_name` were found
"""
branch = self._get_branch_reference(branch_name)
if branch:
return branch
self.fetch()
return self._get_branch_reference(branch_name)
def _get_branch_reference(self, branch_name: str) -> Any:
try:
return self.repo.heads[str(branch_name)]
except (IndexError, ValueError) as e:
logger.warning(e)
return None
def get_current_branch(self) -> Any:
"""Gets the current branch.
Returns:
the current branch
"""
# Workaround for this GitPython issue https://github.com/gitpython-developers/GitPython/issues/510
try:
return self.repo.active_branch
except TypeError as e:
logger.warning(f"Could not determine the branch name using GitPython: {e}")
current_branch = self._get_branch_from_advanced_feature()
if not current_branch:
current_branch = self._get_branch_from_abbreviation("HEAD")
return current_branch
def _get_branch_from_advanced_feature(self) -> Any:
if version.parse(self.git_version()) >= version.parse("2.22"):
current_branch = self.repo.git.branch(show_current=True)
current_branch = current_branch if isinstance(current_branch, str) else current_branch.decode("utf-8")
return self.get_branch(current_branch)
return None
def _get_branch_from_abbreviation(self, abbreviation: str) -> Any:
current_branch = self.repo.git.rev_parse("--abbrev-ref", abbreviation)
current_branch = current_branch if isinstance(current_branch, str) else current_branch.decode("utf-8")
return self.get_branch(current_branch.strip())
def get_commit_count(self) -> int:
"""Gets current commit count.
Gets a number stating how many commits would have been listed
before the current commit.
Returns:
number of commits before this current one.
"""
current_commit = self.get_current_commit()
return int(current_commit.count()) if current_commit else 0
def get_commit_hash(self) -> str:
"""Gets the hash of the current commit.
Returns:
a hash.
"""
return str(self.get_current_commit())
def get_current_commit(self) -> Any:
"""Gets the current commit.
Returns:
the current commit.
"""
return self.repo.commit(self.get_current_branch())
def get_branch_point(self, commit1: Any, commit2: Any) -> Any:
"""Finds the common ancestor.
See https://git-scm.com/docs/git-merge-base
Args:
commit1: commit1
commit2: commit2
Returns:
the branch point.
"""
return self.repo.merge_base(commit1, commit2).pop()
def merge(self, branch: Any) -> None:
"""Merges `branch` to current branch.
Args:
branch: branch to merge
"""
current_branch = self.get_current_branch()
merge_base = self.repo.merge_base(branch, current_branch)
self.repo.index.merge_tree(current_branch, base=merge_base)
self.commit(f"Merge from {str(branch)}", parent_commits=(branch.commit, current_branch.commit))
def get_remote_url(self) -> str:
"""Gets the URL of the remote repository.
Returns:
the corresponding URL.
"""
remote = self._get_remote()
if not remote:
raise ValueError("Undefined remote repository")
url = remote.url
if not url:
raise ValueError("Undefined remote repository URL")
return str(url)
def cherry_pick(self, commit: Any) -> None:
"""Cherry picks a specific commit.
Args:
commit: commit to cherry pick
"""
self.repo.git.cherry_pick(str(commit))
def set_remote_url(self, url: str) -> None:
"""Sets the URL of the remote repository.
Args:
url: URL
"""
remote = self._get_remote()
if remote:
self.repo.delete_remote(str(remote))
self.repo.create_remote(configuration.get_value(ConfigurationVariable.REMOTE_ALIAS), url=url)
def get_remote_branch(self, branch_name: str) -> Optional[Any]:
"""Gets the branch present in the remote repository.
Args:
branch_name: name of the branch
Returns:
corresponding branch if exists. `None` otherwise
"""
remote = self._get_remote()
if not remote:
return None
try:
return remote.refs[str(branch_name)]
except (IndexError, ValueError) as e:
logger.warning(e)
return None
def set_upstream_branch(self, branch_name: str) -> None:
"""Sets the upstream branch of the current branch.
Args:
branch_name: name of the remote branch.
"""
if self.remote_branch_exists(branch_name):
self.repo.git.branch("--set-upstream-to", self.get_remote_branch(branch_name))
def delete_branch(self, branch: Any) -> None:
"""Deletes a branch.
Args:
branch: branch to delete
"""
self.repo.delete_head(branch)
def list_branches(self) -> list:
"""Gets the list of branches.
Returns:
list of branches
"""
return [b for b in self.repo.heads]
def branch_exists(self, branch_name: str) -> bool:
"""Checks whether a branch in the repository exists.
Args:
branch_name: name of the branch
Returns:
True if there is a branch called `branch_name`; False otherwise
"""
return self.get_branch(branch_name) is not None
def remote_branch_exists(self, branch_name: str) -> bool:
"""Checks whether a branch in the remote repository exists.
Args:
branch_name: name of the branch
Returns:
True if there is a remote branch called `branch_name`; False otherwise
"""
return self.get_remote_branch(branch_name) is not None
def _get_specific_changes(self, change_type: Optional[str], commit1: Any, commit2: Any) -> List[str]:
diff = None
if commit1:
diff = commit1.diff(commit2) if commit2 else commit1.diff()
elif commit2:
diff = commit2.diff()
if not diff:
return []
if change_type:
change_type = change_type.upper()
change_type = change_type if change_type in diff.change_type else None
diff_iterator = diff.iter_change_type(change_type) if change_type else diff
changes = [change.a_path if change.a_path else change.b_path for change in diff_iterator]
return changes
def get_changes_list(
self, commit1: Any, commit2: Any, change_type: Optional[str] = None, dir: Optional[str] = None
) -> List[str]:
"""Gets change list.
Gets a list of all the changes that happened between two commits:
list of the paths of the files which changed
Args:
commit1: commit
commit2: other commit
change_type: type of change e.g. 'A' for added files, 'D' for deleted files
dir: directory of interest. if None the whole repository is considered
Returns:
list of paths
"""
changes = self._get_specific_changes(change_type, commit1, commit2)
if dir:
windows_path = dir.replace("/", "\\")
linux_path = dir.replace("\\", "/")
return [change for change in changes if (linux_path in change) or (windows_path in change)]
else:
return changes
def pull_all(self) -> None:
"""Pulls all changes from all remotes."""
self.repo.git.pull(all=True, force=True, quiet=True)
def pull(self) -> None:
"""Pulls changes on current branch from the remote repository."""
if self.remote_branch_exists(self.get_current_branch()):
self.repo.git.pull(self._get_remote(), self.get_current_branch(), quiet=True)
def force_pull(self) -> None:
"""Force pulls changes from the remote repository."""
self.repo.git.pull(self._get_remote(), self.get_current_branch(), quiet=True, force=True)
def push(self) -> None:
"""Pushes commits.
Pushes changes to the remote repository.
Pushes also relevant annotated tags when pushing branches out.
"""
self.repo.git.push("--follow-tags", "--set-upstream", self._get_remote(), self.get_current_branch())
def push_tag(self) -> None:
"""Pushes commits and tags.
Pushes changes to the remote repository.
Tags are also pushed as part of the process
"""
self.repo.git.push(tags=True)
def force_push(self) -> None:
"""Pushes commits with force.
Performs a force push.
"""
self.repo.git.push(force=True)
def force_push_tag(self) -> None:
"""Pushes commits and tags with force.
Performs a force push.
Tags are also pushed as part of the process
"""
self.repo.git.push(force=True, tags=True)
def is_dirty(self) -> bool:
"""Determines whether repository is dirty.
Repository is considered dirty when git status returns elements which are not committed.
"""
return bool(self.repo.is_dirty(untracked_files=True))
def clean(self) -> None:
"""Cleans the repository.
Performs a force clean.
"""
if self.is_dirty():
self.repo.git.clean(force=True, x=True, d=True)
def stash(self) -> None:
"""Stashes the repository.
Performs a stash.
"""
self.repo.git.stash(all=True, quiet=True)
def configure_for_github(self) -> None:
"""Reconfigures the repository.
Configures the repository so that we can commit back to GitHub
"""
self.configure_author()
self.set_remote_url(self._git_url_ssh_to_https(self.get_remote_url()))
def create_tag(self, tag_name: str, message: Optional[str] = None) -> Any:
"""Creates a new tag.
Args:
tag_name: name of the tag
message: tag annotation (https://git-scm.com/book/en/v2/Git-Basics-Tagging#_annotated_tags)
Returns:
corresponding tag
"""
return self.repo.create_tag(tag_name, message=message, force=True)
def create_branch(self, branch_name: str) -> Any:
"""Creates a new branch.
Args:
branch_name: name of the branch
Returns:
corresponding branch
"""
logger.info(f"Creating branch {branch_name}")
return self.repo.create_head(branch_name)
def git_version(self) -> str:
"""Gets git version.
Returns:
the version of git in use.
"""
return ".".join([str(element) for element in self.repo.git.version_info])
def _get_remote(self) -> Optional[Any]:
try:
return self.repo.remote(configuration.get_value(ConfigurationVariable.REMOTE_ALIAS))
except (IndexError, ValueError) as e:
logger.warning(e)
return None
def list_files_added_to_current_commit(self) -> List[str]:
"""Returns a list of files added in the current commit."""
current_commit = self.repo.head.commit
previous_commit = self.repo.commit("HEAD~1")
if not current_commit:
current_commit = self.get_current_commit()
return self.get_changes_list(previous_commit, current_commit, change_type="a")
def list_files_added_on_current_branch(self) -> List[str]:
"""Returns a list of files changed against master branch."""
master_branch = self.get_master_branch()
beta_branch = self.get_beta_branch()
master_branch_commit = self.repo.commit(master_branch)
beta_branch_commit = self.repo.commit(beta_branch)
current_branch_commit = self.repo.commit(self.get_current_branch())
# Finding the baseline branch to consider
master_branch_point = self.repo.commit(self.get_branch_point(master_branch_commit, current_branch_commit))
beta_branch_point = self.repo.commit(self.get_branch_point(beta_branch_commit, current_branch_commit))
branch_point = master_branch_point
if not master_branch:
branch_point = beta_branch_point
elif beta_branch and master_branch:
if beta_branch_point.committed_datetime > master_branch_point.committed_datetime:
# The branch point off `beta` is more recent than off `master`.
# Hence, the difference between current and `beta` should be considered.
branch_point = beta_branch_point
return self.get_changes_list(branch_point, current_branch_commit, change_type="a")
def is_current_branch_feature(self) -> bool:
"""Returns boolean indicating if current branch is considered a feature."""
current_branch = self.get_current_branch()
is_master = current_branch == self.get_master_branch()
is_beta = current_branch == self.get_beta_branch()
is_release = self.is_release_branch(current_branch)
return not (is_master or is_beta or is_release)
def is_current_branch_of_type(self, pattern: str) -> Tuple[bool, Optional[List[Any]]]:
"""Returns boolean indicating whether the current branch follows the pattern and the list of groups if any."""
return self._is_branch_of_type(self.get_current_branch(), pattern)
def _is_branch_of_type(
self, branch_name: Optional[str], pattern: Optional[str]
) -> Tuple[bool, Optional[List[Any]]]:
if not pattern:
return False, None
if not branch_name:
return False, None
match = re.search(pattern, str(branch_name))
return True if match else False, list(match.groups()) if match else None
@property
def uncommitted_changes(self) -> List[Path]:
"""Gets list of uncommitted files.
Returns:
list of uncommitted
"""
status = self.repo.git.status(porcelain=True, untracked_files=True)
if not status:
return []
return [Path(self.root).joinpath(line.strip().split(" ")[-1]) for line in status.splitlines()]
@property
def uncommitted_staged_changes(self) -> List[Path]:
"""Gets list of uncommitted staged changes.
Returns:
list of uncommitted staged changes
"""
staged = self.repo.git.diff(staged=True, name_only=True)
if not staged:
return []
return [Path(self.root).joinpath(line.strip()) for line in staged.splitlines()]
@staticmethod
def _apply_modifications(destination: Path, modified_file: Path) -> None:
logger.info(f"Applying change in {modified_file} to {destination}")
if not destination.parent.exists():
os.makedirs(str(destination.parent), exist_ok=True)
shutil.copy2(src=str(modified_file), dst=str(destination))
@staticmethod
def _apply_deletions(destination: Path) -> None:
logger.info(f"Removing {destination}")
if destination.exists():
destination.unlink()
def apply_uncommitted_changes(self, other_repo: "GitWrapper") -> None:
"""Applies the uncommitted changes found in current repository to another.
Args:
other_repo: repository to apply changes to
"""
dest_root = other_repo.root
for f in self.uncommitted_changes:
destination = dest_root.joinpath(f.relative_to(self.root))
if f.exists():
GitWrapper._apply_modifications(destination, f)
else:
GitWrapper._apply_deletions(destination)
for f in self.uncommitted_staged_changes:
other_repo.add(f.relative_to(self.root))
def get_corresponding_path(self, path_in_initial_repo: Path) -> Path:
"""Gets the path in current repository corresponding to path in initial repository.
If current repository is not a clone, then identical absolute path is returned.
Args:
path_in_initial_repo: path to a file/directory in initial repository.
Returns:
corresponding path.
"""
return (
path_in_initial_repo
if path_in_initial_repo.is_absolute()
else Path(self.root).joinpath(path_in_initial_repo)
)
class ProjectGitWrapper(GitWrapper):
"""Wrapper class over project's repository."""
def __init__(self) -> None:
"""Creates a Git Wrapper."""
super().__init__(
path=Path(configuration.get_value(ConfigurationVariable.PROJECT_ROOT)),
repo=Repo(configuration.get_value(ConfigurationVariable.PROJECT_ROOT)),
)
class LocalProjectRepository:
"""Context manager providing a git wrapper over the current project's repository."""
def __init__(self) -> None:
"""Constructor."""
self._repo = ProjectGitWrapper()
def __enter__(self) -> ProjectGitWrapper:
"""Context manager entry point."""
return self._repo
def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
"""Context manager exit point."""
pass
class GitClone(GitWrapper):
"""Cloned repository.
It behaves exactly like the repository it is based on but
is in a completely different location.
"""
def __init__(self, path: Path, initial_path: Path, repo: Repo) -> None:
"""Creates an instance of GitWrapper.
Args:
path: path to repository.
repo: GitPython repository.
initial_path: path to the repository the clone is based on.
"""
super().__init__(path, repo)
self._initial_path = initial_path
@staticmethod
def wrap(repo: GitWrapper, initial_location: Path) -> "GitClone":
"""Wraps around around a repository."""
return GitClone(repo=repo.repo, path=repo.root, initial_path=initial_location)
@property
def initial_location(self) -> Path:
"""Gets the path to the repository it is based on."""
return self._initial_path
def _add_one_path(self, path_model: Path) -> None:
super()._add_one_path(self.get_corresponding_path(path_model))
def get_corresponding_path(self, path_in_initial_repo: Path) -> Path:
"""Gets the path in cloned repository corresponding to path in initial repository.
Args:
path_in_initial_repo: path to a file/directory in initial repository.
Returns:
corresponding path.
"""
if not path_in_initial_repo.is_absolute():
return Path(self.root).joinpath(path_in_initial_repo)
try:
# Tyring to find if the path corresponds to a file/directory present in intial repository
return Path(self.root).joinpath(path_in_initial_repo.relative_to(self.initial_location))
except ValueError:
return path_in_initial_repo
class GitTempClone:
"""Context manager providing a temporary cloned repository."""
def __init__(self, desired_branch_name: Optional[str], repository_to_clone: GitWrapper):
"""Constructor.
Args:
desired_branch_name: the branch to consider. I
repository_to_clone: the repository to clone. If not specified, the project repository will be used.
"""
self._temporary_dir = TemporaryDirectory()
logger.info(f"Creating a temporary repository in {self._temporary_dir}")
self._repo = repository_to_clone
_current_branch_name = desired_branch_name if desired_branch_name else str(self._repo.get_current_branch())
self._clone = GitClone.wrap(self._repo.clone(self._temporary_dir.path), initial_location=self._repo.root)
self._clone.checkout(_current_branch_name)
self._repo.apply_uncommitted_changes(self._clone)
def __enter__(self) -> GitClone:
"""Context manager entry point."""
return self._clone
def __exit__(self, type: Any, value: Any, traceback: Any) -> None:
"""Context manager exit point.
As described in
https://github.com/gitpython-developers/GitPython/blob/60acfa5d8d454a7c968640a307772902d211f043/git/repo/base.py#L223,
Tempfiles objects on Windows are holding references to open files until
they are collected by the garbage collector, thus preventing deletion.
"""
self._repo.repo.close()
self._temporary_dir.cleanup()
class ProjectTempClone(GitTempClone):
"""Temporary cloned repository for current project."""
def __init__(self, desired_branch_name: Optional[str] = None):
"""Constructor.
Args:
desired_branch_name: the branch to consider. if not specified, the
system will try to identify the current branch in the repository which
will work in most cases but probably not on CI.
"""
super().__init__(desired_branch_name=desired_branch_name, repository_to_clone=ProjectGitWrapper())
Classes
class GitClone (path: pathlib.Path, initial_path: pathlib.Path, repo: git.repo.base.Repo)
-
Cloned repository.
It behaves exactly like the repository it is based on but is in a completely different location.
Creates an instance of GitWrapper.
Args
path
- path to repository.
repo
- GitPython repository.
initial_path
- path to the repository the clone is based on.
Expand source code
class GitClone(GitWrapper): """Cloned repository. It behaves exactly like the repository it is based on but is in a completely different location. """ def __init__(self, path: Path, initial_path: Path, repo: Repo) -> None: """Creates an instance of GitWrapper. Args: path: path to repository. repo: GitPython repository. initial_path: path to the repository the clone is based on. """ super().__init__(path, repo) self._initial_path = initial_path @staticmethod def wrap(repo: GitWrapper, initial_location: Path) -> "GitClone": """Wraps around around a repository.""" return GitClone(repo=repo.repo, path=repo.root, initial_path=initial_location) @property def initial_location(self) -> Path: """Gets the path to the repository it is based on.""" return self._initial_path def _add_one_path(self, path_model: Path) -> None: super()._add_one_path(self.get_corresponding_path(path_model)) def get_corresponding_path(self, path_in_initial_repo: Path) -> Path: """Gets the path in cloned repository corresponding to path in initial repository. Args: path_in_initial_repo: path to a file/directory in initial repository. Returns: corresponding path. """ if not path_in_initial_repo.is_absolute(): return Path(self.root).joinpath(path_in_initial_repo) try: # Tyring to find if the path corresponds to a file/directory present in intial repository return Path(self.root).joinpath(path_in_initial_repo.relative_to(self.initial_location)) except ValueError: return path_in_initial_repo
Ancestors
Static methods
def wrap(repo: GitWrapper, initial_location: pathlib.Path) ‑> GitClone
-
Wraps around around a repository.
Expand source code
@staticmethod def wrap(repo: GitWrapper, initial_location: Path) -> "GitClone": """Wraps around around a repository.""" return GitClone(repo=repo.repo, path=repo.root, initial_path=initial_location)
Instance variables
var initial_location : pathlib.Path
-
Gets the path to the repository it is based on.
Expand source code
@property def initial_location(self) -> Path: """Gets the path to the repository it is based on.""" return self._initial_path
Methods
def get_corresponding_path(self, path_in_initial_repo: pathlib.Path) ‑> pathlib.Path
-
Gets the path in cloned repository corresponding to path in initial repository.
Args
path_in_initial_repo
- path to a file/directory in initial repository.
Returns
corresponding path.
Expand source code
def get_corresponding_path(self, path_in_initial_repo: Path) -> Path: """Gets the path in cloned repository corresponding to path in initial repository. Args: path_in_initial_repo: path to a file/directory in initial repository. Returns: corresponding path. """ if not path_in_initial_repo.is_absolute(): return Path(self.root).joinpath(path_in_initial_repo) try: # Tyring to find if the path corresponds to a file/directory present in intial repository return Path(self.root).joinpath(path_in_initial_repo.relative_to(self.initial_location)) except ValueError: return path_in_initial_repo
Inherited members
GitWrapper
:add
apply_uncommitted_changes
branch_exists
checkout
checkout_branch
cherry_pick
clean
clone
commit
configure_author
configure_for_github
create_branch
create_tag
delete_branch
fetch
force_pull
force_push
force_push_tag
get_beta_branch
get_branch
get_branch_point
get_changes_list
get_commit_count
get_commit_hash
get_current_branch
get_current_commit
get_master_branch
get_remote_branch
get_remote_url
git_version
is_current_branch_feature
is_current_branch_of_type
is_dirty
is_release_branch
list_branches
list_files_added_on_current_branch
list_files_added_to_current_commit
merge
pull
pull_all
push
push_tag
remote_branch_exists
root
set_remote_url
set_upstream_branch
stash
uncommitted_changes
uncommitted_staged_changes
class GitTempClone (desired_branch_name: Optional[str], repository_to_clone: GitWrapper)
-
Context manager providing a temporary cloned repository.
Constructor.
Args
desired_branch_name
- the branch to consider. I
repository_to_clone
- the repository to clone. If not specified, the project repository will be used.
Expand source code
class GitTempClone: """Context manager providing a temporary cloned repository.""" def __init__(self, desired_branch_name: Optional[str], repository_to_clone: GitWrapper): """Constructor. Args: desired_branch_name: the branch to consider. I repository_to_clone: the repository to clone. If not specified, the project repository will be used. """ self._temporary_dir = TemporaryDirectory() logger.info(f"Creating a temporary repository in {self._temporary_dir}") self._repo = repository_to_clone _current_branch_name = desired_branch_name if desired_branch_name else str(self._repo.get_current_branch()) self._clone = GitClone.wrap(self._repo.clone(self._temporary_dir.path), initial_location=self._repo.root) self._clone.checkout(_current_branch_name) self._repo.apply_uncommitted_changes(self._clone) def __enter__(self) -> GitClone: """Context manager entry point.""" return self._clone def __exit__(self, type: Any, value: Any, traceback: Any) -> None: """Context manager exit point. As described in https://github.com/gitpython-developers/GitPython/blob/60acfa5d8d454a7c968640a307772902d211f043/git/repo/base.py#L223, Tempfiles objects on Windows are holding references to open files until they are collected by the garbage collector, thus preventing deletion. """ self._repo.repo.close() self._temporary_dir.cleanup()
Subclasses
class GitWrapper (path: pathlib.Path, repo: git.repo.base.Repo)
-
Wrapper class to provide convenient methods for performing git actions.
Creates an instance of GitWrapper.
Args
path
- path to repository.
repo
- GitPython repository.
Expand source code
class GitWrapper: """Wrapper class to provide convenient methods for performing git actions.""" def __init__(self, path: Path, repo: Repo) -> None: """Creates an instance of GitWrapper. Args: path: path to repository. repo: GitPython repository. """ self._root_path = path self.repo = repo self.author = Actor( configuration.get_value(ConfigurationVariable.BOT_USERNAME), configuration.get_value(ConfigurationVariable.BOT_EMAIL), ) def _git_url_ssh_to_https(self, url: str) -> str: """Changes repository URL to use authorisation token. Converts the git url to use the GitHub token: See https://github.blog/2012-09-21-easier-builds-and-deployments-using-git-over-https-and-oauth/ Returns: new URL """ path = url.split("github.com", 1)[1][1:].strip() new = "https://{GITHUB_TOKEN}:x-oauth-basic@github.com/%s" % path logger.info("rewriting git url to: %s" % new) return new.format(GITHUB_TOKEN=configuration.get_value(ConfigurationVariable.GIT_TOKEN)) def clone(self, path: Path) -> "GitWrapper": """Clones this repository to the path. Args: path: path where to put the clone Returns: a wrapper over the cloned repository """ try: git_clone = self.repo.clone_from( url=self.get_remote_url(), to_path=str(path), multi_options=["--recurse-submodules"] ) except GitCommandError as e: logger.info("failed cloning repository: %s" % e) logger.info("trying with authentication") git_clone = self.repo.clone_from( url=self._git_url_ssh_to_https(self.get_remote_url()), to_path=str(path), multi_options=["--recurse-submodules"], ) clone = GitWrapper(path=path, repo=git_clone) clone.set_remote_url(self.get_remote_url()) clone.fetch() return clone @property def root(self) -> Path: """Gets repository root folder. Returns: repository root folder. """ return self._root_path def configure_author(self) -> None: """Sets the author.""" self.repo.config_writer().set_value("user", "name", self.author.name).release() self.repo.config_writer().set_value("user", "email", self.author.email).release() def checkout_branch(self, branch_name: str) -> Any: """Checks out a branch from its name. Args: branch_name: name of the branch Returns: corresponding branch if found; None otherwise. """ logger.debug(f"Checking out {branch_name}") local_branch = self.get_branch(branch_name) if local_branch: self.checkout(local_branch) return local_branch def checkout(self, branch: Any) -> None: """Checks out a branch. Args: branch: branch to check out """ self.repo.git.checkout(branch) def _add_one_file_or_one_dir(self, path: Path) -> None: if not path: raise ValueError("Unspecified path.") self._add_one_path(Path(path)) def _add_one_path(self, path_model: Path) -> None: if not path_model.is_absolute(): path_model = Path(self.root).joinpath(path_model) if not path_model.exists(): logger.warning(f"[Git] {path_model} cannot be added because not found.") return relative_path = str(path_model.relative_to(self.root)) unix_relative_path = relative_path.replace("\\", "/") if path_model.is_dir(): unix_relative_path = f"{unix_relative_path}/*" logger.info(f"Adding {unix_relative_path} to repository.") self.repo.git.add(unix_relative_path) def add(self, path: Union[list, set, Path]) -> None: """Adds a file or a list of files. Args: path: file path or list of file paths """ if isinstance(path, list) or isinstance(path, set): for element in path: self.add(element) else: self._add_one_file_or_one_dir(path) def commit(self, message: str, **kwargs: Optional[Tuple[str, Any]]) -> None: """Commits changes to the repository. Args: message: commit message **kwargs: extra parameters """ logger.info("Committing changes") self.repo.index.commit(message, author=self.author, **kwargs) def get_master_branch(self) -> Any: """Gets the `master` branch. Returns: corresponding branch """ main = configuration.get_value(ConfigurationVariable.MASTER_BRANCH) branch = self.get_branch(main) if branch: return branch return self.get_remote_branch(main) def get_beta_branch(self) -> Any: """Gets the `beta` branch. Returns: corresponding branch """ beta = configuration.get_value(ConfigurationVariable.BETA_BRANCH) branch = self.get_branch(beta) if branch: return branch return self.get_remote_branch(beta) def is_release_branch(self, branch_name: Optional[str]) -> bool: """Checks whether the branch is a `release` branch or not. Args: branch_name: name of the branch Returns: True if the branch is used for `release` code; False otherwise """ branch_pattern = configuration.get_value(ConfigurationVariable.RELEASE_BRANCH_PATTERN) is_release, _ = self._is_branch_of_type(branch_name, branch_pattern) return is_release def fetch(self) -> None: """Fetches latest changes.""" self.repo.git.fetch(all=True, tags=True, force=True) def get_branch(self, branch_name: str) -> Any: """Gets a specific local branch. Args: branch_name: name of the branch to look for Returns: corresponding branch or `None` if no branches with this `branch_name` were found """ branch = self._get_branch_reference(branch_name) if branch: return branch self.fetch() return self._get_branch_reference(branch_name) def _get_branch_reference(self, branch_name: str) -> Any: try: return self.repo.heads[str(branch_name)] except (IndexError, ValueError) as e: logger.warning(e) return None def get_current_branch(self) -> Any: """Gets the current branch. Returns: the current branch """ # Workaround for this GitPython issue https://github.com/gitpython-developers/GitPython/issues/510 try: return self.repo.active_branch except TypeError as e: logger.warning(f"Could not determine the branch name using GitPython: {e}") current_branch = self._get_branch_from_advanced_feature() if not current_branch: current_branch = self._get_branch_from_abbreviation("HEAD") return current_branch def _get_branch_from_advanced_feature(self) -> Any: if version.parse(self.git_version()) >= version.parse("2.22"): current_branch = self.repo.git.branch(show_current=True) current_branch = current_branch if isinstance(current_branch, str) else current_branch.decode("utf-8") return self.get_branch(current_branch) return None def _get_branch_from_abbreviation(self, abbreviation: str) -> Any: current_branch = self.repo.git.rev_parse("--abbrev-ref", abbreviation) current_branch = current_branch if isinstance(current_branch, str) else current_branch.decode("utf-8") return self.get_branch(current_branch.strip()) def get_commit_count(self) -> int: """Gets current commit count. Gets a number stating how many commits would have been listed before the current commit. Returns: number of commits before this current one. """ current_commit = self.get_current_commit() return int(current_commit.count()) if current_commit else 0 def get_commit_hash(self) -> str: """Gets the hash of the current commit. Returns: a hash. """ return str(self.get_current_commit()) def get_current_commit(self) -> Any: """Gets the current commit. Returns: the current commit. """ return self.repo.commit(self.get_current_branch()) def get_branch_point(self, commit1: Any, commit2: Any) -> Any: """Finds the common ancestor. See https://git-scm.com/docs/git-merge-base Args: commit1: commit1 commit2: commit2 Returns: the branch point. """ return self.repo.merge_base(commit1, commit2).pop() def merge(self, branch: Any) -> None: """Merges `branch` to current branch. Args: branch: branch to merge """ current_branch = self.get_current_branch() merge_base = self.repo.merge_base(branch, current_branch) self.repo.index.merge_tree(current_branch, base=merge_base) self.commit(f"Merge from {str(branch)}", parent_commits=(branch.commit, current_branch.commit)) def get_remote_url(self) -> str: """Gets the URL of the remote repository. Returns: the corresponding URL. """ remote = self._get_remote() if not remote: raise ValueError("Undefined remote repository") url = remote.url if not url: raise ValueError("Undefined remote repository URL") return str(url) def cherry_pick(self, commit: Any) -> None: """Cherry picks a specific commit. Args: commit: commit to cherry pick """ self.repo.git.cherry_pick(str(commit)) def set_remote_url(self, url: str) -> None: """Sets the URL of the remote repository. Args: url: URL """ remote = self._get_remote() if remote: self.repo.delete_remote(str(remote)) self.repo.create_remote(configuration.get_value(ConfigurationVariable.REMOTE_ALIAS), url=url) def get_remote_branch(self, branch_name: str) -> Optional[Any]: """Gets the branch present in the remote repository. Args: branch_name: name of the branch Returns: corresponding branch if exists. `None` otherwise """ remote = self._get_remote() if not remote: return None try: return remote.refs[str(branch_name)] except (IndexError, ValueError) as e: logger.warning(e) return None def set_upstream_branch(self, branch_name: str) -> None: """Sets the upstream branch of the current branch. Args: branch_name: name of the remote branch. """ if self.remote_branch_exists(branch_name): self.repo.git.branch("--set-upstream-to", self.get_remote_branch(branch_name)) def delete_branch(self, branch: Any) -> None: """Deletes a branch. Args: branch: branch to delete """ self.repo.delete_head(branch) def list_branches(self) -> list: """Gets the list of branches. Returns: list of branches """ return [b for b in self.repo.heads] def branch_exists(self, branch_name: str) -> bool: """Checks whether a branch in the repository exists. Args: branch_name: name of the branch Returns: True if there is a branch called `branch_name`; False otherwise """ return self.get_branch(branch_name) is not None def remote_branch_exists(self, branch_name: str) -> bool: """Checks whether a branch in the remote repository exists. Args: branch_name: name of the branch Returns: True if there is a remote branch called `branch_name`; False otherwise """ return self.get_remote_branch(branch_name) is not None def _get_specific_changes(self, change_type: Optional[str], commit1: Any, commit2: Any) -> List[str]: diff = None if commit1: diff = commit1.diff(commit2) if commit2 else commit1.diff() elif commit2: diff = commit2.diff() if not diff: return [] if change_type: change_type = change_type.upper() change_type = change_type if change_type in diff.change_type else None diff_iterator = diff.iter_change_type(change_type) if change_type else diff changes = [change.a_path if change.a_path else change.b_path for change in diff_iterator] return changes def get_changes_list( self, commit1: Any, commit2: Any, change_type: Optional[str] = None, dir: Optional[str] = None ) -> List[str]: """Gets change list. Gets a list of all the changes that happened between two commits: list of the paths of the files which changed Args: commit1: commit commit2: other commit change_type: type of change e.g. 'A' for added files, 'D' for deleted files dir: directory of interest. if None the whole repository is considered Returns: list of paths """ changes = self._get_specific_changes(change_type, commit1, commit2) if dir: windows_path = dir.replace("/", "\\") linux_path = dir.replace("\\", "/") return [change for change in changes if (linux_path in change) or (windows_path in change)] else: return changes def pull_all(self) -> None: """Pulls all changes from all remotes.""" self.repo.git.pull(all=True, force=True, quiet=True) def pull(self) -> None: """Pulls changes on current branch from the remote repository.""" if self.remote_branch_exists(self.get_current_branch()): self.repo.git.pull(self._get_remote(), self.get_current_branch(), quiet=True) def force_pull(self) -> None: """Force pulls changes from the remote repository.""" self.repo.git.pull(self._get_remote(), self.get_current_branch(), quiet=True, force=True) def push(self) -> None: """Pushes commits. Pushes changes to the remote repository. Pushes also relevant annotated tags when pushing branches out. """ self.repo.git.push("--follow-tags", "--set-upstream", self._get_remote(), self.get_current_branch()) def push_tag(self) -> None: """Pushes commits and tags. Pushes changes to the remote repository. Tags are also pushed as part of the process """ self.repo.git.push(tags=True) def force_push(self) -> None: """Pushes commits with force. Performs a force push. """ self.repo.git.push(force=True) def force_push_tag(self) -> None: """Pushes commits and tags with force. Performs a force push. Tags are also pushed as part of the process """ self.repo.git.push(force=True, tags=True) def is_dirty(self) -> bool: """Determines whether repository is dirty. Repository is considered dirty when git status returns elements which are not committed. """ return bool(self.repo.is_dirty(untracked_files=True)) def clean(self) -> None: """Cleans the repository. Performs a force clean. """ if self.is_dirty(): self.repo.git.clean(force=True, x=True, d=True) def stash(self) -> None: """Stashes the repository. Performs a stash. """ self.repo.git.stash(all=True, quiet=True) def configure_for_github(self) -> None: """Reconfigures the repository. Configures the repository so that we can commit back to GitHub """ self.configure_author() self.set_remote_url(self._git_url_ssh_to_https(self.get_remote_url())) def create_tag(self, tag_name: str, message: Optional[str] = None) -> Any: """Creates a new tag. Args: tag_name: name of the tag message: tag annotation (https://git-scm.com/book/en/v2/Git-Basics-Tagging#_annotated_tags) Returns: corresponding tag """ return self.repo.create_tag(tag_name, message=message, force=True) def create_branch(self, branch_name: str) -> Any: """Creates a new branch. Args: branch_name: name of the branch Returns: corresponding branch """ logger.info(f"Creating branch {branch_name}") return self.repo.create_head(branch_name) def git_version(self) -> str: """Gets git version. Returns: the version of git in use. """ return ".".join([str(element) for element in self.repo.git.version_info]) def _get_remote(self) -> Optional[Any]: try: return self.repo.remote(configuration.get_value(ConfigurationVariable.REMOTE_ALIAS)) except (IndexError, ValueError) as e: logger.warning(e) return None def list_files_added_to_current_commit(self) -> List[str]: """Returns a list of files added in the current commit.""" current_commit = self.repo.head.commit previous_commit = self.repo.commit("HEAD~1") if not current_commit: current_commit = self.get_current_commit() return self.get_changes_list(previous_commit, current_commit, change_type="a") def list_files_added_on_current_branch(self) -> List[str]: """Returns a list of files changed against master branch.""" master_branch = self.get_master_branch() beta_branch = self.get_beta_branch() master_branch_commit = self.repo.commit(master_branch) beta_branch_commit = self.repo.commit(beta_branch) current_branch_commit = self.repo.commit(self.get_current_branch()) # Finding the baseline branch to consider master_branch_point = self.repo.commit(self.get_branch_point(master_branch_commit, current_branch_commit)) beta_branch_point = self.repo.commit(self.get_branch_point(beta_branch_commit, current_branch_commit)) branch_point = master_branch_point if not master_branch: branch_point = beta_branch_point elif beta_branch and master_branch: if beta_branch_point.committed_datetime > master_branch_point.committed_datetime: # The branch point off `beta` is more recent than off `master`. # Hence, the difference between current and `beta` should be considered. branch_point = beta_branch_point return self.get_changes_list(branch_point, current_branch_commit, change_type="a") def is_current_branch_feature(self) -> bool: """Returns boolean indicating if current branch is considered a feature.""" current_branch = self.get_current_branch() is_master = current_branch == self.get_master_branch() is_beta = current_branch == self.get_beta_branch() is_release = self.is_release_branch(current_branch) return not (is_master or is_beta or is_release) def is_current_branch_of_type(self, pattern: str) -> Tuple[bool, Optional[List[Any]]]: """Returns boolean indicating whether the current branch follows the pattern and the list of groups if any.""" return self._is_branch_of_type(self.get_current_branch(), pattern) def _is_branch_of_type( self, branch_name: Optional[str], pattern: Optional[str] ) -> Tuple[bool, Optional[List[Any]]]: if not pattern: return False, None if not branch_name: return False, None match = re.search(pattern, str(branch_name)) return True if match else False, list(match.groups()) if match else None @property def uncommitted_changes(self) -> List[Path]: """Gets list of uncommitted files. Returns: list of uncommitted """ status = self.repo.git.status(porcelain=True, untracked_files=True) if not status: return [] return [Path(self.root).joinpath(line.strip().split(" ")[-1]) for line in status.splitlines()] @property def uncommitted_staged_changes(self) -> List[Path]: """Gets list of uncommitted staged changes. Returns: list of uncommitted staged changes """ staged = self.repo.git.diff(staged=True, name_only=True) if not staged: return [] return [Path(self.root).joinpath(line.strip()) for line in staged.splitlines()] @staticmethod def _apply_modifications(destination: Path, modified_file: Path) -> None: logger.info(f"Applying change in {modified_file} to {destination}") if not destination.parent.exists(): os.makedirs(str(destination.parent), exist_ok=True) shutil.copy2(src=str(modified_file), dst=str(destination)) @staticmethod def _apply_deletions(destination: Path) -> None: logger.info(f"Removing {destination}") if destination.exists(): destination.unlink() def apply_uncommitted_changes(self, other_repo: "GitWrapper") -> None: """Applies the uncommitted changes found in current repository to another. Args: other_repo: repository to apply changes to """ dest_root = other_repo.root for f in self.uncommitted_changes: destination = dest_root.joinpath(f.relative_to(self.root)) if f.exists(): GitWrapper._apply_modifications(destination, f) else: GitWrapper._apply_deletions(destination) for f in self.uncommitted_staged_changes: other_repo.add(f.relative_to(self.root)) def get_corresponding_path(self, path_in_initial_repo: Path) -> Path: """Gets the path in current repository corresponding to path in initial repository. If current repository is not a clone, then identical absolute path is returned. Args: path_in_initial_repo: path to a file/directory in initial repository. Returns: corresponding path. """ return ( path_in_initial_repo if path_in_initial_repo.is_absolute() else Path(self.root).joinpath(path_in_initial_repo) )
Subclasses
Instance variables
var root : pathlib.Path
-
Gets repository root folder.
Returns
repository root folder.
Expand source code
@property def root(self) -> Path: """Gets repository root folder. Returns: repository root folder. """ return self._root_path
var uncommitted_changes : List[pathlib.Path]
-
Gets list of uncommitted files.
Returns
list of uncommitted
Expand source code
@property def uncommitted_changes(self) -> List[Path]: """Gets list of uncommitted files. Returns: list of uncommitted """ status = self.repo.git.status(porcelain=True, untracked_files=True) if not status: return [] return [Path(self.root).joinpath(line.strip().split(" ")[-1]) for line in status.splitlines()]
var uncommitted_staged_changes : List[pathlib.Path]
-
Gets list of uncommitted staged changes.
Returns
list of uncommitted staged changes
Expand source code
@property def uncommitted_staged_changes(self) -> List[Path]: """Gets list of uncommitted staged changes. Returns: list of uncommitted staged changes """ staged = self.repo.git.diff(staged=True, name_only=True) if not staged: return [] return [Path(self.root).joinpath(line.strip()) for line in staged.splitlines()]
Methods
def add(self, path: Union[list, set, pathlib.Path]) ‑> None
-
Adds a file or a list of files.
Args
path
- file path or list of file paths
Expand source code
def add(self, path: Union[list, set, Path]) -> None: """Adds a file or a list of files. Args: path: file path or list of file paths """ if isinstance(path, list) or isinstance(path, set): for element in path: self.add(element) else: self._add_one_file_or_one_dir(path)
def apply_uncommitted_changes(self, other_repo: GitWrapper) ‑> None
-
Applies the uncommitted changes found in current repository to another.
Args
other_repo
- repository to apply changes to
Expand source code
def apply_uncommitted_changes(self, other_repo: "GitWrapper") -> None: """Applies the uncommitted changes found in current repository to another. Args: other_repo: repository to apply changes to """ dest_root = other_repo.root for f in self.uncommitted_changes: destination = dest_root.joinpath(f.relative_to(self.root)) if f.exists(): GitWrapper._apply_modifications(destination, f) else: GitWrapper._apply_deletions(destination) for f in self.uncommitted_staged_changes: other_repo.add(f.relative_to(self.root))
def branch_exists(self, branch_name: str) ‑> bool
-
Checks whether a branch in the repository exists.
Args
branch_name
- name of the branch
Returns
True if there is a branch called
branch_name
; False otherwiseExpand source code
def branch_exists(self, branch_name: str) -> bool: """Checks whether a branch in the repository exists. Args: branch_name: name of the branch Returns: True if there is a branch called `branch_name`; False otherwise """ return self.get_branch(branch_name) is not None
def checkout(self, branch: Any) ‑> None
-
Checks out a branch.
Args
branch
- branch to check out
Expand source code
def checkout(self, branch: Any) -> None: """Checks out a branch. Args: branch: branch to check out """ self.repo.git.checkout(branch)
def checkout_branch(self, branch_name: str) ‑> Any
-
Checks out a branch from its name.
Args
branch_name
- name of the branch
Returns
corresponding branch if found; None otherwise.
Expand source code
def checkout_branch(self, branch_name: str) -> Any: """Checks out a branch from its name. Args: branch_name: name of the branch Returns: corresponding branch if found; None otherwise. """ logger.debug(f"Checking out {branch_name}") local_branch = self.get_branch(branch_name) if local_branch: self.checkout(local_branch) return local_branch
def cherry_pick(self, commit: Any) ‑> None
-
Cherry picks a specific commit.
Args
commit
- commit to cherry pick
Expand source code
def cherry_pick(self, commit: Any) -> None: """Cherry picks a specific commit. Args: commit: commit to cherry pick """ self.repo.git.cherry_pick(str(commit))
def clean(self) ‑> None
-
Cleans the repository.
Performs a force clean.
Expand source code
def clean(self) -> None: """Cleans the repository. Performs a force clean. """ if self.is_dirty(): self.repo.git.clean(force=True, x=True, d=True)
def clone(self, path: pathlib.Path) ‑> GitWrapper
-
Clones this repository to the path.
Args
path
- path where to put the clone
Returns
a wrapper over the cloned repository
Expand source code
def clone(self, path: Path) -> "GitWrapper": """Clones this repository to the path. Args: path: path where to put the clone Returns: a wrapper over the cloned repository """ try: git_clone = self.repo.clone_from( url=self.get_remote_url(), to_path=str(path), multi_options=["--recurse-submodules"] ) except GitCommandError as e: logger.info("failed cloning repository: %s" % e) logger.info("trying with authentication") git_clone = self.repo.clone_from( url=self._git_url_ssh_to_https(self.get_remote_url()), to_path=str(path), multi_options=["--recurse-submodules"], ) clone = GitWrapper(path=path, repo=git_clone) clone.set_remote_url(self.get_remote_url()) clone.fetch() return clone
def commit(self, message: str, **kwargs: Optional[Tuple[str, Any]]) ‑> None
-
Commits changes to the repository.
Args
message
- commit message
**kwargs
- extra parameters
Expand source code
def commit(self, message: str, **kwargs: Optional[Tuple[str, Any]]) -> None: """Commits changes to the repository. Args: message: commit message **kwargs: extra parameters """ logger.info("Committing changes") self.repo.index.commit(message, author=self.author, **kwargs)
-
Sets the author.
Expand source code
def configure_author(self) -> None: """Sets the author.""" self.repo.config_writer().set_value("user", "name", self.author.name).release() self.repo.config_writer().set_value("user", "email", self.author.email).release()
def configure_for_github(self) ‑> None
-
Reconfigures the repository.
Configures the repository so that we can commit back to GitHub
Expand source code
def configure_for_github(self) -> None: """Reconfigures the repository. Configures the repository so that we can commit back to GitHub """ self.configure_author() self.set_remote_url(self._git_url_ssh_to_https(self.get_remote_url()))
def create_branch(self, branch_name: str) ‑> Any
-
Creates a new branch.
Args
branch_name
- name of the branch
Returns
corresponding branch
Expand source code
def create_branch(self, branch_name: str) -> Any: """Creates a new branch. Args: branch_name: name of the branch Returns: corresponding branch """ logger.info(f"Creating branch {branch_name}") return self.repo.create_head(branch_name)
def create_tag(self, tag_name: str, message: Optional[str] = None) ‑> Any
-
Creates a new tag.
Args
tag_name
- name of the tag
message
- tag annotation (https://git-scm.com/book/en/v2/Git-Basics-Tagging#_annotated_tags)
Returns
corresponding tag
Expand source code
def create_tag(self, tag_name: str, message: Optional[str] = None) -> Any: """Creates a new tag. Args: tag_name: name of the tag message: tag annotation (https://git-scm.com/book/en/v2/Git-Basics-Tagging#_annotated_tags) Returns: corresponding tag """ return self.repo.create_tag(tag_name, message=message, force=True)
def delete_branch(self, branch: Any) ‑> None
-
Deletes a branch.
Args
branch
- branch to delete
Expand source code
def delete_branch(self, branch: Any) -> None: """Deletes a branch. Args: branch: branch to delete """ self.repo.delete_head(branch)
def fetch(self) ‑> None
-
Fetches latest changes.
Expand source code
def fetch(self) -> None: """Fetches latest changes.""" self.repo.git.fetch(all=True, tags=True, force=True)
def force_pull(self) ‑> None
-
Force pulls changes from the remote repository.
Expand source code
def force_pull(self) -> None: """Force pulls changes from the remote repository.""" self.repo.git.pull(self._get_remote(), self.get_current_branch(), quiet=True, force=True)
def force_push(self) ‑> None
-
Pushes commits with force.
Performs a force push.
Expand source code
def force_push(self) -> None: """Pushes commits with force. Performs a force push. """ self.repo.git.push(force=True)
def force_push_tag(self) ‑> None
-
Pushes commits and tags with force.
Performs a force push. Tags are also pushed as part of the process
Expand source code
def force_push_tag(self) -> None: """Pushes commits and tags with force. Performs a force push. Tags are also pushed as part of the process """ self.repo.git.push(force=True, tags=True)
def get_beta_branch(self) ‑> Any
-
Gets the
beta
branch.Returns
corresponding branch
Expand source code
def get_beta_branch(self) -> Any: """Gets the `beta` branch. Returns: corresponding branch """ beta = configuration.get_value(ConfigurationVariable.BETA_BRANCH) branch = self.get_branch(beta) if branch: return branch return self.get_remote_branch(beta)
def get_branch(self, branch_name: str) ‑> Any
-
Gets a specific local branch.
Args
branch_name
- name of the branch to look for
Returns
corresponding branch or
None
if no branches with thisbranch_name
were foundExpand source code
def get_branch(self, branch_name: str) -> Any: """Gets a specific local branch. Args: branch_name: name of the branch to look for Returns: corresponding branch or `None` if no branches with this `branch_name` were found """ branch = self._get_branch_reference(branch_name) if branch: return branch self.fetch() return self._get_branch_reference(branch_name)
def get_branch_point(self, commit1: Any, commit2: Any) ‑> Any
-
Finds the common ancestor.
See https://git-scm.com/docs/git-merge-base
Args
commit1
- commit1
commit2
- commit2
Returns
the branch point.
Expand source code
def get_branch_point(self, commit1: Any, commit2: Any) -> Any: """Finds the common ancestor. See https://git-scm.com/docs/git-merge-base Args: commit1: commit1 commit2: commit2 Returns: the branch point. """ return self.repo.merge_base(commit1, commit2).pop()
def get_changes_list(self, commit1: Any, commit2: Any, change_type: Optional[str] = None, dir: Optional[str] = None) ‑> List[str]
-
Gets change list.
Gets a list of all the changes that happened between two commits: list of the paths of the files which changed
Args
commit1
- commit
commit2
- other commit
change_type
- type of change e.g. 'A' for added files, 'D' for deleted files
dir
- directory of interest. if None the whole repository is considered
Returns
list of paths
Expand source code
def get_changes_list( self, commit1: Any, commit2: Any, change_type: Optional[str] = None, dir: Optional[str] = None ) -> List[str]: """Gets change list. Gets a list of all the changes that happened between two commits: list of the paths of the files which changed Args: commit1: commit commit2: other commit change_type: type of change e.g. 'A' for added files, 'D' for deleted files dir: directory of interest. if None the whole repository is considered Returns: list of paths """ changes = self._get_specific_changes(change_type, commit1, commit2) if dir: windows_path = dir.replace("/", "\\") linux_path = dir.replace("\\", "/") return [change for change in changes if (linux_path in change) or (windows_path in change)] else: return changes
def get_commit_count(self) ‑> int
-
Gets current commit count.
Gets a number stating how many commits would have been listed before the current commit.
Returns
number of commits before this current one.
Expand source code
def get_commit_count(self) -> int: """Gets current commit count. Gets a number stating how many commits would have been listed before the current commit. Returns: number of commits before this current one. """ current_commit = self.get_current_commit() return int(current_commit.count()) if current_commit else 0
def get_commit_hash(self) ‑> str
-
Gets the hash of the current commit.
Returns
a hash.
Expand source code
def get_commit_hash(self) -> str: """Gets the hash of the current commit. Returns: a hash. """ return str(self.get_current_commit())
def get_corresponding_path(self, path_in_initial_repo: pathlib.Path) ‑> pathlib.Path
-
Gets the path in current repository corresponding to path in initial repository.
If current repository is not a clone, then identical absolute path is returned.
Args
path_in_initial_repo
- path to a file/directory in initial repository.
Returns
corresponding path.
Expand source code
def get_corresponding_path(self, path_in_initial_repo: Path) -> Path: """Gets the path in current repository corresponding to path in initial repository. If current repository is not a clone, then identical absolute path is returned. Args: path_in_initial_repo: path to a file/directory in initial repository. Returns: corresponding path. """ return ( path_in_initial_repo if path_in_initial_repo.is_absolute() else Path(self.root).joinpath(path_in_initial_repo) )
def get_current_branch(self) ‑> Any
-
Gets the current branch.
Returns
the current branch
Expand source code
def get_current_branch(self) -> Any: """Gets the current branch. Returns: the current branch """ # Workaround for this GitPython issue https://github.com/gitpython-developers/GitPython/issues/510 try: return self.repo.active_branch except TypeError as e: logger.warning(f"Could not determine the branch name using GitPython: {e}") current_branch = self._get_branch_from_advanced_feature() if not current_branch: current_branch = self._get_branch_from_abbreviation("HEAD") return current_branch
def get_current_commit(self) ‑> Any
-
Gets the current commit.
Returns
the current commit.
Expand source code
def get_current_commit(self) -> Any: """Gets the current commit. Returns: the current commit. """ return self.repo.commit(self.get_current_branch())
def get_master_branch(self) ‑> Any
-
Gets the
master
branch.Returns
corresponding branch
Expand source code
def get_master_branch(self) -> Any: """Gets the `master` branch. Returns: corresponding branch """ main = configuration.get_value(ConfigurationVariable.MASTER_BRANCH) branch = self.get_branch(main) if branch: return branch return self.get_remote_branch(main)
def get_remote_branch(self, branch_name: str) ‑> Optional[Any]
-
Gets the branch present in the remote repository.
Args
branch_name
- name of the branch
Returns
corresponding branch if exists.
None
otherwiseExpand source code
def get_remote_branch(self, branch_name: str) -> Optional[Any]: """Gets the branch present in the remote repository. Args: branch_name: name of the branch Returns: corresponding branch if exists. `None` otherwise """ remote = self._get_remote() if not remote: return None try: return remote.refs[str(branch_name)] except (IndexError, ValueError) as e: logger.warning(e) return None
def get_remote_url(self) ‑> str
-
Gets the URL of the remote repository.
Returns
the corresponding URL.
Expand source code
def get_remote_url(self) -> str: """Gets the URL of the remote repository. Returns: the corresponding URL. """ remote = self._get_remote() if not remote: raise ValueError("Undefined remote repository") url = remote.url if not url: raise ValueError("Undefined remote repository URL") return str(url)
def git_version(self) ‑> str
-
Gets git version.
Returns
the version of git in use.
Expand source code
def git_version(self) -> str: """Gets git version. Returns: the version of git in use. """ return ".".join([str(element) for element in self.repo.git.version_info])
def is_current_branch_feature(self) ‑> bool
-
Returns boolean indicating if current branch is considered a feature.
Expand source code
def is_current_branch_feature(self) -> bool: """Returns boolean indicating if current branch is considered a feature.""" current_branch = self.get_current_branch() is_master = current_branch == self.get_master_branch() is_beta = current_branch == self.get_beta_branch() is_release = self.is_release_branch(current_branch) return not (is_master or is_beta or is_release)
def is_current_branch_of_type(self, pattern: str) ‑> Tuple[bool, Optional[List[Any]]]
-
Returns boolean indicating whether the current branch follows the pattern and the list of groups if any.
Expand source code
def is_current_branch_of_type(self, pattern: str) -> Tuple[bool, Optional[List[Any]]]: """Returns boolean indicating whether the current branch follows the pattern and the list of groups if any.""" return self._is_branch_of_type(self.get_current_branch(), pattern)
def is_dirty(self) ‑> bool
-
Determines whether repository is dirty.
Repository is considered dirty when git status returns elements which are not committed.
Expand source code
def is_dirty(self) -> bool: """Determines whether repository is dirty. Repository is considered dirty when git status returns elements which are not committed. """ return bool(self.repo.is_dirty(untracked_files=True))
def is_release_branch(self, branch_name: Optional[str]) ‑> bool
-
Checks whether the branch is a
release
branch or not.Args
branch_name
- name of the branch
Returns
True if the branch is used for
release
code; False otherwiseExpand source code
def is_release_branch(self, branch_name: Optional[str]) -> bool: """Checks whether the branch is a `release` branch or not. Args: branch_name: name of the branch Returns: True if the branch is used for `release` code; False otherwise """ branch_pattern = configuration.get_value(ConfigurationVariable.RELEASE_BRANCH_PATTERN) is_release, _ = self._is_branch_of_type(branch_name, branch_pattern) return is_release
def list_branches(self) ‑> list
-
Gets the list of branches.
Returns
list of branches
Expand source code
def list_branches(self) -> list: """Gets the list of branches. Returns: list of branches """ return [b for b in self.repo.heads]
def list_files_added_on_current_branch(self) ‑> List[str]
-
Returns a list of files changed against master branch.
Expand source code
def list_files_added_on_current_branch(self) -> List[str]: """Returns a list of files changed against master branch.""" master_branch = self.get_master_branch() beta_branch = self.get_beta_branch() master_branch_commit = self.repo.commit(master_branch) beta_branch_commit = self.repo.commit(beta_branch) current_branch_commit = self.repo.commit(self.get_current_branch()) # Finding the baseline branch to consider master_branch_point = self.repo.commit(self.get_branch_point(master_branch_commit, current_branch_commit)) beta_branch_point = self.repo.commit(self.get_branch_point(beta_branch_commit, current_branch_commit)) branch_point = master_branch_point if not master_branch: branch_point = beta_branch_point elif beta_branch and master_branch: if beta_branch_point.committed_datetime > master_branch_point.committed_datetime: # The branch point off `beta` is more recent than off `master`. # Hence, the difference between current and `beta` should be considered. branch_point = beta_branch_point return self.get_changes_list(branch_point, current_branch_commit, change_type="a")
def list_files_added_to_current_commit(self) ‑> List[str]
-
Returns a list of files added in the current commit.
Expand source code
def list_files_added_to_current_commit(self) -> List[str]: """Returns a list of files added in the current commit.""" current_commit = self.repo.head.commit previous_commit = self.repo.commit("HEAD~1") if not current_commit: current_commit = self.get_current_commit() return self.get_changes_list(previous_commit, current_commit, change_type="a")
def merge(self, branch: Any) ‑> None
-
Merges
branch
to current branch.Args
branch
- branch to merge
Expand source code
def merge(self, branch: Any) -> None: """Merges `branch` to current branch. Args: branch: branch to merge """ current_branch = self.get_current_branch() merge_base = self.repo.merge_base(branch, current_branch) self.repo.index.merge_tree(current_branch, base=merge_base) self.commit(f"Merge from {str(branch)}", parent_commits=(branch.commit, current_branch.commit))
def pull(self) ‑> None
-
Pulls changes on current branch from the remote repository.
Expand source code
def pull(self) -> None: """Pulls changes on current branch from the remote repository.""" if self.remote_branch_exists(self.get_current_branch()): self.repo.git.pull(self._get_remote(), self.get_current_branch(), quiet=True)
def pull_all(self) ‑> None
-
Pulls all changes from all remotes.
Expand source code
def pull_all(self) -> None: """Pulls all changes from all remotes.""" self.repo.git.pull(all=True, force=True, quiet=True)
def push(self) ‑> None
-
Pushes commits.
Pushes changes to the remote repository. Pushes also relevant annotated tags when pushing branches out.
Expand source code
def push(self) -> None: """Pushes commits. Pushes changes to the remote repository. Pushes also relevant annotated tags when pushing branches out. """ self.repo.git.push("--follow-tags", "--set-upstream", self._get_remote(), self.get_current_branch())
def push_tag(self) ‑> None
-
Pushes commits and tags.
Pushes changes to the remote repository. Tags are also pushed as part of the process
Expand source code
def push_tag(self) -> None: """Pushes commits and tags. Pushes changes to the remote repository. Tags are also pushed as part of the process """ self.repo.git.push(tags=True)
def remote_branch_exists(self, branch_name: str) ‑> bool
-
Checks whether a branch in the remote repository exists.
Args
branch_name
- name of the branch
Returns
True if there is a remote branch called
branch_name
; False otherwiseExpand source code
def remote_branch_exists(self, branch_name: str) -> bool: """Checks whether a branch in the remote repository exists. Args: branch_name: name of the branch Returns: True if there is a remote branch called `branch_name`; False otherwise """ return self.get_remote_branch(branch_name) is not None
def set_remote_url(self, url: str) ‑> None
-
Sets the URL of the remote repository.
Args
url
- URL
Expand source code
def set_remote_url(self, url: str) -> None: """Sets the URL of the remote repository. Args: url: URL """ remote = self._get_remote() if remote: self.repo.delete_remote(str(remote)) self.repo.create_remote(configuration.get_value(ConfigurationVariable.REMOTE_ALIAS), url=url)
def set_upstream_branch(self, branch_name: str) ‑> None
-
Sets the upstream branch of the current branch.
Args
branch_name
- name of the remote branch.
Expand source code
def set_upstream_branch(self, branch_name: str) -> None: """Sets the upstream branch of the current branch. Args: branch_name: name of the remote branch. """ if self.remote_branch_exists(branch_name): self.repo.git.branch("--set-upstream-to", self.get_remote_branch(branch_name))
def stash(self) ‑> None
-
Stashes the repository.
Performs a stash.
Expand source code
def stash(self) -> None: """Stashes the repository. Performs a stash. """ self.repo.git.stash(all=True, quiet=True)
class LocalProjectRepository
-
Context manager providing a git wrapper over the current project's repository.
Constructor.
Expand source code
class LocalProjectRepository: """Context manager providing a git wrapper over the current project's repository.""" def __init__(self) -> None: """Constructor.""" self._repo = ProjectGitWrapper() def __enter__(self) -> ProjectGitWrapper: """Context manager entry point.""" return self._repo def __exit__(self, type: Any, value: Any, traceback: Any) -> None: """Context manager exit point.""" pass
class ProjectGitWrapper
-
Wrapper class over project's repository.
Creates a Git Wrapper.
Expand source code
class ProjectGitWrapper(GitWrapper): """Wrapper class over project's repository.""" def __init__(self) -> None: """Creates a Git Wrapper.""" super().__init__( path=Path(configuration.get_value(ConfigurationVariable.PROJECT_ROOT)), repo=Repo(configuration.get_value(ConfigurationVariable.PROJECT_ROOT)), )
Ancestors
Inherited members
GitWrapper
:add
apply_uncommitted_changes
branch_exists
checkout
checkout_branch
cherry_pick
clean
clone
commit
configure_author
configure_for_github
create_branch
create_tag
delete_branch
fetch
force_pull
force_push
force_push_tag
get_beta_branch
get_branch
get_branch_point
get_changes_list
get_commit_count
get_commit_hash
get_corresponding_path
get_current_branch
get_current_commit
get_master_branch
get_remote_branch
get_remote_url
git_version
is_current_branch_feature
is_current_branch_of_type
is_dirty
is_release_branch
list_branches
list_files_added_on_current_branch
list_files_added_to_current_commit
merge
pull
pull_all
push
push_tag
remote_branch_exists
root
set_remote_url
set_upstream_branch
stash
uncommitted_changes
uncommitted_staged_changes
class ProjectTempClone (desired_branch_name: Optional[str] = None)
-
Temporary cloned repository for current project.
Constructor.
Args
desired_branch_name
- the branch to consider. if not specified, the
system will try to identify the current branch in the repository which will work in most cases but probably not on CI.
Expand source code
class ProjectTempClone(GitTempClone): """Temporary cloned repository for current project.""" def __init__(self, desired_branch_name: Optional[str] = None): """Constructor. Args: desired_branch_name: the branch to consider. if not specified, the system will try to identify the current branch in the repository which will work in most cases but probably not on CI. """ super().__init__(desired_branch_name=desired_branch_name, repository_to_clone=ProjectGitWrapper())
Ancestors