Commit 80d2106d authored by Ahmad Sherif's avatar Ahmad Sherif

Migrate GetBlob to Gitaly

parent eeae2054
......@@ -2,6 +2,7 @@ package main
import (
"bytes"
"encoding/base64"
"fmt"
"math/rand"
"net"
......@@ -9,6 +10,7 @@ import (
"os"
"os/exec"
"path"
"strconv"
"strings"
"testing"
"time"
......@@ -324,6 +326,61 @@ func TestPostUploadPackHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
testhelper.AssertResponseHeader(t, resp, "Content-Type", "application/x-git-upload-pack-result")
}
func TestGetBlobProxiedToGitalySuccessfully(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
pathEncoded := base64.StdEncoding.EncodeToString([]byte("LICENSE"))
repoStorage := "default"
revisionEncoded := base64.StdEncoding.EncodeToString([]byte("54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"))
repoRelativePath := "foo/bar.git"
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"TreeEntryRequest":{"repository":{"storage_name":"%s","relative_path":"%s"},"revision":"%s","path":"%s"}}`,
gitalyAddress, repoStorage, repoRelativePath, revisionEncoded, pathEncoded)
expectedBody := testhelper.GitalyTreeEntryResponseMock
blobLength := len(expectedBody)
resp, body, err := doSendDataRequest("/something", "git-blob", jsonParams)
require.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL)
assert.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL)
assert.Equal(t, blobLength, len(body), "GET %q: body size", resp.Request.URL)
testhelper.AssertResponseHeader(t, resp, "Content-Length", strconv.Itoa(blobLength))
}
func TestGetBlobProxiedToGitalyInterruptedStream(t *testing.T) {
gitalyServer, socketPath := startGitalyServer(t, codes.OK)
defer gitalyServer.Stop()
gitalyAddress := "unix://" + socketPath
pathEncoded := base64.StdEncoding.EncodeToString([]byte("LICENSE"))
repoStorage := "default"
revisionEncoded := base64.StdEncoding.EncodeToString([]byte("54fcc214b94e78d7a41a9a8fe6d87a5e59500e51"))
repoRelativePath := "foo/bar.git"
jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"TreeEntryRequest":{"repository":{"storage_name":"%s","relative_path":"%s"},"revision":"%s","path":"%s"}}`,
gitalyAddress, repoStorage, repoRelativePath, revisionEncoded, pathEncoded)
resp, _, err := doSendDataRequest("/something", "git-blob", jsonParams)
require.NoError(t, err)
// This causes the server stream to be interrupted instead of consumed entirely.
resp.Body.Close()
done := make(chan struct{})
go func() {
gitalyServer.WaitGroup.Wait()
close(done)
}()
select {
case <-done:
return
case <-time.After(10 * time.Second):
t.Fatal("time out waiting for gitaly handler to return")
}
}
type combinedServer struct {
*grpc.Server
*testhelper.GitalyTestServer
......@@ -340,6 +397,7 @@ func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServ
gitalyServer := testhelper.NewGitalyServer(finalMessageCode)
pb.RegisterSmartHTTPServer(server, gitalyServer)
pb.RegisterCommitServer(server, gitalyServer)
go server.Serve(listener)
......
......@@ -7,12 +7,20 @@ import (
"net/http"
"strings"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
)
type blob struct{ senddata.Prefix }
type blobParams struct{ RepoPath, BlobId string }
type blobParams struct {
RepoPath string
BlobId string
GitalyServer gitaly.Server
TreeEntryRequest pb.TreeEntryRequest
}
var SendBlob = &blob{"git-blob:"}
......@@ -23,6 +31,25 @@ func (b *blob) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
return
}
if params.GitalyServer.Address != "" {
handleSendBlobWithGitaly(w, r, &params)
} else {
handleSendBlobLocally(w, r, &params)
}
}
func handleSendBlobWithGitaly(w http.ResponseWriter, r *http.Request, params *blobParams) {
commitClient, err := gitaly.NewCommitClient(params.GitalyServer)
if err != nil {
helper.Fail500(w, r, fmt.Errorf("commit.GetBlob: %v", err))
}
if err := commitClient.SendBlob(w, &params.TreeEntryRequest); err != nil {
helper.Fail500(w, r, fmt.Errorf("commit.GetBlob: %v", err))
}
}
func handleSendBlobLocally(w http.ResponseWriter, r *http.Request, params *blobParams) {
log.Printf("SendBlob: sending %q for %q", params.BlobId, r.URL.Path)
sizeOutput, err := gitCommand("", "", "git", "--git-dir="+params.RepoPath, "cat-file", "-s", params.BlobId).Output()
......
package gitaly
import (
"fmt"
"io"
"net/http"
"strconv"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
"golang.org/x/net/context"
)
type CommitClient struct {
pb.CommitClient
}
func (client *CommitClient) SendBlob(w http.ResponseWriter, request *pb.TreeEntryRequest) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
c, err := client.TreeEntry(ctx, request)
if err != nil {
return fmt.Errorf("rpc failed: %v", err)
}
firstResponseReceived := false
rr := pbhelper.NewReceiveReader(func() ([]byte, error) {
resp, err := c.Recv()
if !firstResponseReceived && err == nil {
firstResponseReceived = true
w.Header().Set("Content-Length", strconv.FormatInt(resp.GetSize(), 10))
}
return resp.GetData(), err
})
if _, err := io.Copy(w, rr); err != nil {
return fmt.Errorf("copy rpc data: %v", err)
}
return nil
}
......@@ -36,6 +36,15 @@ func NewSmartHTTPClient(server Server) (*SmartHTTPClient, error) {
return &SmartHTTPClient{grpcClient}, nil
}
func NewCommitClient(server Server) (*CommitClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
return nil, err
}
grpcClient := pb.NewCommitClient(conn)
return &CommitClient{grpcClient}, nil
}
func getOrCreateConnection(server Server) (*grpc.ClientConn, error) {
cache.Lock()
defer cache.Unlock()
......
......@@ -11,6 +11,7 @@ import (
pb "gitlab.com/gitlab-org/gitaly-proto/go"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
......@@ -22,6 +23,7 @@ type GitalyTestServer struct {
var (
GitalyInfoRefsResponseMock = strings.Repeat("Mock Gitaly InfoRefsResponse data", 100000)
GitalyTreeEntryResponseMock = strings.Repeat("Mock Gitaly TreeEntryResponse data", 100000)
GitalyReceivePackResponseMock []byte
GitalyUploadPackResponseMock []byte
)
......@@ -167,6 +169,46 @@ func (s *GitalyTestServer) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServ
return s.finalError()
}
func (s *GitalyTestServer) CommitIsAncestor(ctx context.Context, in *pb.CommitIsAncestorRequest) (*pb.CommitIsAncestorResponse, error) {
return nil, nil
}
func (s *GitalyTestServer) TreeEntry(in *pb.TreeEntryRequest, stream pb.Commit_TreeEntryServer) error {
s.WaitGroup.Add(1)
defer s.WaitGroup.Done()
if err := validateRepository(in.GetRepository()); err != nil {
return err
}
response := &pb.TreeEntryResponse{
Type: pb.TreeEntryResponse_BLOB,
Oid: "deadfacedeadfacedeadfacedeadfacedeadface",
Size: int64(len(GitalyTreeEntryResponseMock)),
Mode: 0100644,
}
nSends, err := sendBytes([]byte(GitalyTreeEntryResponseMock), 100, func(p []byte) error {
response.Data = p
if err := stream.Send(response); err != nil {
return err
}
// Use a new response so we don't send other fields (Size, ...) over and over
response = &pb.TreeEntryResponse{}
return nil
})
if err != nil {
return err
}
if nSends <= 1 {
panic("should have sent more than one message")
}
return s.finalError()
}
// sendBytes returns the number of times the 'sender' function was called and an error.
func sendBytes(data []byte, chunkSize int, sender func([]byte) error) (int, error) {
i := 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment