forgejo/services/mirror/mirror_push.go
Lunny Xiao 5f82ead13c
Simplify how git repositories are opened (#28937)
## Purpose
This is a refactor toward building an abstraction over managing git
repositories.
Afterwards, it does not matter anymore if they are stored on the local
disk or somewhere remote.

## What this PR changes
We used `git.OpenRepository` everywhere previously.
Now, we should split them into two distinct functions:

Firstly, there are temporary repositories which do not change:

```go
git.OpenRepository(ctx, diskPath)
```

Gitea managed repositories having a record in the database in the
`repository` table are moved into the new package `gitrepo`:

```go
gitrepo.OpenRepository(ctx, repo_model.Repo)
```

Why is `repo_model.Repository` the second parameter instead of file
path?
Because then we can easily adapt our repository storage strategy.
The repositories can be stored locally, however, they could just as well
be stored on a remote server.

## Further changes in other PRs
- A Git Command wrapper on package `gitrepo` could be created. i.e.
`NewCommand(ctx, repo_model.Repository, commands...)`. `git.RunOpts{Dir:
repo.RepoPath()}`, the directory should be empty before invoking this
method and it can be filled in the function only. #28940
- Remove the `RepoPath()`/`WikiPath()` functions to reduce the
possibility of mistakes.

---------

Co-authored-by: delvh <dev.lh@web.de>
2024-01-27 21:09:51 +01:00

278 lines
7.6 KiB
Go

// Copyright 2021 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package mirror
import (
"context"
"errors"
"fmt"
"io"
"regexp"
"strings"
"time"
"code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/lfs"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/modules/util"
)
var stripExitStatus = regexp.MustCompile(`exit status \d+ - `)
// AddPushMirrorRemote registers the push mirror remote.
func AddPushMirrorRemote(ctx context.Context, m *repo_model.PushMirror, addr string) error {
addRemoteAndConfig := func(addr, path string) error {
cmd := git.NewCommand(ctx, "remote", "add", "--mirror=push").AddDynamicArguments(m.RemoteName, addr)
if strings.Contains(addr, "://") && strings.Contains(addr, "@") {
cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=push %s [repo_path: %s]", m.RemoteName, util.SanitizeCredentialURLs(addr), path))
} else {
cmd.SetDescription(fmt.Sprintf("remote add %s --mirror=push %s [repo_path: %s]", m.RemoteName, addr, path))
}
if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: path}); err != nil {
return err
}
if _, _, err := git.NewCommand(ctx, "config", "--add").AddDynamicArguments("remote."+m.RemoteName+".push", "+refs/heads/*:refs/heads/*").RunStdString(&git.RunOpts{Dir: path}); err != nil {
return err
}
if _, _, err := git.NewCommand(ctx, "config", "--add").AddDynamicArguments("remote."+m.RemoteName+".push", "+refs/tags/*:refs/tags/*").RunStdString(&git.RunOpts{Dir: path}); err != nil {
return err
}
return nil
}
if err := addRemoteAndConfig(addr, m.Repo.RepoPath()); err != nil {
return err
}
if m.Repo.HasWiki() {
wikiRemoteURL := repository.WikiRemoteURL(ctx, addr)
if len(wikiRemoteURL) > 0 {
if err := addRemoteAndConfig(wikiRemoteURL, m.Repo.WikiPath()); err != nil {
return err
}
}
}
return nil
}
// RemovePushMirrorRemote removes the push mirror remote.
func RemovePushMirrorRemote(ctx context.Context, m *repo_model.PushMirror) error {
cmd := git.NewCommand(ctx, "remote", "rm").AddDynamicArguments(m.RemoteName)
_ = m.GetRepository(ctx)
if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: m.Repo.RepoPath()}); err != nil {
return err
}
if m.Repo.HasWiki() {
if _, _, err := cmd.RunStdString(&git.RunOpts{Dir: m.Repo.WikiPath()}); err != nil {
// The wiki remote may not exist
log.Warn("Wiki Remote[%d] could not be removed: %v", m.ID, err)
}
}
return nil
}
// SyncPushMirror starts the sync of the push mirror and schedules the next run.
func SyncPushMirror(ctx context.Context, mirrorID int64) bool {
log.Trace("SyncPushMirror [mirror: %d]", mirrorID)
defer func() {
err := recover()
if err == nil {
return
}
// There was a panic whilst syncPushMirror...
log.Error("PANIC whilst syncPushMirror[%d] Panic: %v\nStacktrace: %s", mirrorID, err, log.Stack(2))
}()
// TODO: Handle "!exist" better
m, exist, err := db.GetByID[repo_model.PushMirror](ctx, mirrorID)
if err != nil || !exist {
log.Error("GetPushMirrorByID [%d]: %v", mirrorID, err)
return false
}
_ = m.GetRepository(ctx)
m.LastError = ""
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Syncing PushMirror %s/%s to %s", m.Repo.OwnerName, m.Repo.Name, m.RemoteName))
defer finished()
log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Running Sync", m.ID, m.Repo)
err = runPushSync(ctx, m)
if err != nil {
log.Error("SyncPushMirror [mirror: %d][repo: %-v]: %v", m.ID, m.Repo, err)
m.LastError = stripExitStatus.ReplaceAllLiteralString(err.Error(), "")
}
m.LastUpdateUnix = timeutil.TimeStampNow()
if err := repo_model.UpdatePushMirror(ctx, m); err != nil {
log.Error("UpdatePushMirror [%d]: %v", m.ID, err)
return false
}
log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Finished", m.ID, m.Repo)
return err == nil
}
func runPushSync(ctx context.Context, m *repo_model.PushMirror) error {
timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second
performPush := func(repo *repo_model.Repository, isWiki bool) error {
path := repo.RepoPath()
if isWiki {
path = repo.WikiPath()
}
remoteURL, err := git.GetRemoteURL(ctx, path, m.RemoteName)
if err != nil {
log.Error("GetRemoteAddress(%s) Error %v", path, err)
return errors.New("Unexpected error")
}
if setting.LFS.StartServer {
log.Trace("SyncMirrors [repo: %-v]: syncing LFS objects...", m.Repo)
var gitRepo *git.Repository
if isWiki {
gitRepo, err = gitrepo.OpenWikiRepository(ctx, repo)
} else {
gitRepo, err = gitrepo.OpenRepository(ctx, repo)
}
if err != nil {
log.Error("OpenRepository: %v", err)
return errors.New("Unexpected error")
}
defer gitRepo.Close()
endpoint := lfs.DetermineEndpoint(remoteURL.String(), "")
lfsClient := lfs.NewClient(endpoint, nil)
if err := pushAllLFSObjects(ctx, gitRepo, lfsClient); err != nil {
return util.SanitizeErrorCredentialURLs(err)
}
}
log.Trace("Pushing %s mirror[%d] remote %s", path, m.ID, m.RemoteName)
if err := git.Push(ctx, path, git.PushOptions{
Remote: m.RemoteName,
Force: true,
Mirror: true,
Timeout: timeout,
}); err != nil {
log.Error("Error pushing %s mirror[%d] remote %s: %v", path, m.ID, m.RemoteName, err)
return util.SanitizeErrorCredentialURLs(err)
}
return nil
}
err := performPush(m.Repo, false)
if err != nil {
return err
}
if m.Repo.HasWiki() {
_, err := git.GetRemoteAddress(ctx, m.Repo.WikiPath(), m.RemoteName)
if err == nil {
err := performPush(m.Repo, true)
if err != nil {
return err
}
} else {
log.Trace("Skipping wiki: No remote configured")
}
}
return nil
}
func pushAllLFSObjects(ctx context.Context, gitRepo *git.Repository, lfsClient lfs.Client) error {
contentStore := lfs.NewContentStore()
pointerChan := make(chan lfs.PointerBlob)
errChan := make(chan error, 1)
go lfs.SearchPointerBlobs(ctx, gitRepo, pointerChan, errChan)
uploadObjects := func(pointers []lfs.Pointer) error {
err := lfsClient.Upload(ctx, pointers, func(p lfs.Pointer, objectError error) (io.ReadCloser, error) {
if objectError != nil {
return nil, objectError
}
content, err := contentStore.Get(p)
if err != nil {
log.Error("Error reading LFS object %v: %v", p, err)
}
return content, err
})
if err != nil {
select {
case <-ctx.Done():
return nil
default:
}
}
return err
}
var batch []lfs.Pointer
for pointerBlob := range pointerChan {
exists, err := contentStore.Exists(pointerBlob.Pointer)
if err != nil {
log.Error("Error checking if LFS object %v exists: %v", pointerBlob.Pointer, err)
return err
}
if !exists {
log.Trace("Skipping missing LFS object %v", pointerBlob.Pointer)
continue
}
batch = append(batch, pointerBlob.Pointer)
if len(batch) >= lfsClient.BatchSize() {
if err := uploadObjects(batch); err != nil {
return err
}
batch = nil
}
}
if len(batch) > 0 {
if err := uploadObjects(batch); err != nil {
return err
}
}
err, has := <-errChan
if has {
log.Error("Error enumerating LFS objects for repository: %v", err)
return err
}
return nil
}
func syncPushMirrorWithSyncOnCommit(ctx context.Context, repoID int64) {
pushMirrors, err := repo_model.GetPushMirrorsSyncedOnCommit(ctx, repoID)
if err != nil {
log.Error("repo_model.GetPushMirrorsSyncedOnCommit failed: %v", err)
return
}
for _, mirror := range pushMirrors {
AddPushMirrorToQueue(mirror.ID)
}
}