Commit c1326899 authored by Nick Thomas's avatar Nick Thomas

Merge branch 'feature/use-gitaly-for-upload-receive-pack' into 'master'

Forward {upload,receive}-pack requests to Gitaly

Closes gitaly#125

See merge request !143
parents 2b604f62 619d33bc
......@@ -57,10 +57,10 @@ func TestDownloadingFromValidArchive(t *testing.T) {
testhelper.AssertResponseCode(t, response, 200)
testhelper.AssertResponseHeader(t, response,
testhelper.AssertResponseWriterHeader(t, response,
"Content-Type",
"text/plain; charset=utf-8")
testhelper.AssertResponseHeader(t, response,
testhelper.AssertResponseWriterHeader(t, response,
"Content-Disposition",
"attachment; filename=\"test.txt\"")
......
......@@ -63,9 +63,9 @@ func TestSetArchiveHeaders(t *testing.T) {
setArchiveHeaders(w, testCase.in, "filename")
testhelper.AssertResponseHeader(t, w, "Content-Type", testCase.out)
testhelper.AssertResponseHeader(t, w, "Content-Length")
testhelper.AssertResponseHeader(t, w, "Content-Disposition", `attachment; filename="filename"`)
testhelper.AssertResponseHeader(t, w, "Cache-Control", "private")
testhelper.AssertResponseWriterHeader(t, w, "Content-Type", testCase.out)
testhelper.AssertResponseWriterHeader(t, w, "Content-Length")
testhelper.AssertResponseWriterHeader(t, w, "Content-Disposition", `attachment; filename="filename"`)
testhelper.AssertResponseWriterHeader(t, w, "Cache-Control", "private")
}
}
......@@ -2,9 +2,11 @@ package git
import (
"fmt"
"io"
"net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
......@@ -16,7 +18,19 @@ func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Respons
cr, cw := helper.NewWriteAfterReader(r.Body, w)
defer cw.Flush()
cmd, err := startGitCommand(a, cr, cw, action)
var err error
if a.GitalySocketPath == "" {
err = handleReceivePackLocally(a, r, cr, cw, action)
} else {
err = handleReceivePackWithGitaly(a, cr, cw)
}
return err
}
func handleReceivePackLocally(a *api.Response, r *http.Request, stdin io.Reader, stdout io.Writer, action string) error {
cmd, err := startGitCommand(a, stdin, stdout, action)
if err != nil {
return fmt.Errorf("startGitCommand: %v", err)
}
......@@ -30,3 +44,16 @@ func handleReceivePack(w *GitHttpResponseWriter, r *http.Request, a *api.Respons
return nil
}
func handleReceivePackWithGitaly(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
smarthttp, err := gitaly.NewSmartHTTPClient(a.GitalySocketPath)
if err != nil {
return fmt.Errorf("smarthttp.ReceivePack: %v", err)
}
if err := smarthttp.ReceivePack(a, clientRequest, clientResponse); err != nil {
return fmt.Errorf("smarthttp.ReceivePack: %v", err)
}
return nil
}
......@@ -4,8 +4,10 @@ import (
"fmt"
"io"
"net/http"
"os"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)
......@@ -22,15 +24,25 @@ func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response
defer buffer.Close()
r.Body.Close()
isShallowClone := scanDeepen(buffer)
if _, err := buffer.Seek(0, 0); err != nil {
return fmt.Errorf("seek tempfile: %v", err)
}
action := getService(r)
writePostRPCHeader(w, action)
cmd, err := startGitCommand(a, buffer, w, action)
if a.GitalySocketPath == "" {
err = handleUploadPackLocally(a, r, buffer, w, action)
} else {
err = handleUploadPackWithGitaly(a, buffer, w)
}
return err
}
func handleUploadPackLocally(a *api.Response, r *http.Request, stdin *os.File, stdout io.Writer, action string) error {
isShallowClone := scanDeepen(stdin)
if _, err := stdin.Seek(0, 0); err != nil {
return fmt.Errorf("seek tempfile: %v", err)
}
cmd, err := startGitCommand(a, stdin, stdout, action)
if err != nil {
return fmt.Errorf("startGitCommand: %v", err)
}
......@@ -44,3 +56,16 @@ func handleUploadPack(w *GitHttpResponseWriter, r *http.Request, a *api.Response
return nil
}
func handleUploadPackWithGitaly(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
smarthttp, err := gitaly.NewSmartHTTPClient(a.GitalySocketPath)
if err != nil {
return fmt.Errorf("smarthttp.UploadPack: %v", err)
}
if err := smarthttp.UploadPack(a, clientRequest, clientResponse); err != nil {
return fmt.Errorf("smarthttp.UploadPack: %v", err)
}
return nil
}
......@@ -4,15 +4,29 @@ import (
"fmt"
"io"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type SmartHTTPClient struct {
pb.SmartHTTPClient
}
type uploadPackWriter struct {
pb.SmartHTTP_PostUploadPackClient
}
type receivePackWriter struct {
pb.SmartHTTP_PostReceivePackClient
}
const sendChunkSize = 16384
func (client *SmartHTTPClient) InfoRefsResponseWriterTo(repo *pb.Repository, rpc string) (io.WriterTo, error) {
rpcRequest := &pb.InfoRefsRequest{Repository: repo}
var c pbhelper.InfoRefsClient
......@@ -33,3 +47,112 @@ func (client *SmartHTTPClient) InfoRefsResponseWriterTo(repo *pb.Repository, rpc
return &pbhelper.InfoRefsClientWriterTo{c}, nil
}
func (client *SmartHTTPClient) ReceivePack(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
repo := &pb.Repository{Path: a.RepoPath}
stream, err := client.PostReceivePack(context.Background())
if err != nil {
return err
}
rpcRequest := &pb.PostReceivePackRequest{
Repository: repo,
GlId: a.GL_ID,
}
if err := stream.Send(rpcRequest); err != nil {
return fmt.Errorf("initial request: %v", err)
}
waitc := make(chan error, 1)
go receiveGitalyResponse(stream, waitc, clientResponse, func() ([]byte, error) {
response, err := stream.Recv()
return response.GetData(), err
})
_, sendErr := io.Copy(receivePackWriter{stream}, clientRequest)
stream.CloseSend()
if recvErr := <-waitc; recvErr != nil {
return recvErr
}
if sendErr != nil {
return fmt.Errorf("send: %v", sendErr)
}
return nil
}
func (client *SmartHTTPClient) UploadPack(a *api.Response, clientRequest io.Reader, clientResponse io.Writer) error {
repo := &pb.Repository{Path: a.RepoPath}
stream, err := client.PostUploadPack(context.Background())
if err != nil {
return err
}
rpcRequest := &pb.PostUploadPackRequest{
Repository: repo,
}
if err := stream.Send(rpcRequest); err != nil {
return fmt.Errorf("initial request: %v", err)
}
waitc := make(chan error, 1)
go receiveGitalyResponse(stream, waitc, clientResponse, func() ([]byte, error) {
response, err := stream.Recv()
return response.GetData(), err
})
_, sendErr := io.Copy(uploadPackWriter{stream}, clientRequest)
stream.CloseSend()
if recvErr := <-waitc; recvErr != nil {
return recvErr
}
if sendErr != nil {
return fmt.Errorf("send: %v", sendErr)
}
return nil
}
func receiveGitalyResponse(cs grpc.ClientStream, waitc chan error, clientResponse io.Writer, receiver func() ([]byte, error)) {
defer func() {
close(waitc)
cs.CloseSend()
}()
for {
data, err := receiver()
if err != nil {
if err != io.EOF {
waitc <- fmt.Errorf("receive: %v", err)
}
return
}
if _, err := clientResponse.Write(data); err != nil {
waitc <- fmt.Errorf("write: %v", err)
return
}
}
}
func (rw uploadPackWriter) Write(p []byte) (int, error) {
resp := &pb.PostUploadPackRequest{Data: p}
if err := rw.Send(resp); err != nil {
return 0, err
}
return len(p), nil
}
func (rw receivePackWriter) Write(p []byte) (int, error) {
resp := &pb.PostReceivePackRequest{Data: p}
if err := rw.Send(resp); err != nil {
return 0, err
}
return len(p), nil
}
......@@ -110,13 +110,13 @@ func testServingThePregzippedFile(t *testing.T, enableGzip bool) {
st.ServeExisting("/", CacheDisabled, nil).ServeHTTP(w, httpRequest)
testhelper.AssertResponseCode(t, w, 200)
if enableGzip {
testhelper.AssertResponseHeader(t, w, "Content-Encoding", "gzip")
testhelper.AssertResponseWriterHeader(t, w, "Content-Encoding", "gzip")
if bytes.Compare(w.Body.Bytes(), fileGzipContent.Bytes()) != 0 {
t.Error("We should serve the pregzipped file")
}
} else {
testhelper.AssertResponseCode(t, w, 200)
testhelper.AssertResponseHeader(t, w, "Content-Encoding")
testhelper.AssertResponseWriterHeader(t, w, "Content-Encoding")
if w.Body.String() != fileContent {
t.Error("We should serve the file: ", w.Body.String())
}
......
package testhelper
import (
"io"
"io/ioutil"
"log"
"path"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
)
......@@ -8,6 +13,19 @@ type GitalyTestServer struct{}
const GitalyInfoRefsResponseMock = "Mock Gitaly InfoRefsResponse data"
var GitalyReceivePackResponseMock []byte
var GitalyUploadPackResponseMock []byte
func init() {
var err error
if GitalyReceivePackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/receive-pack-fixture.txt")); err != nil {
log.Fatal(err)
}
if GitalyUploadPackResponseMock, err = ioutil.ReadFile(path.Join(RootDir(), "testdata/upload-pack-fixture.txt")); err != nil {
log.Fatal(err)
}
}
func NewGitalyServer() *GitalyTestServer {
return &GitalyTestServer{}
}
......@@ -26,6 +44,70 @@ func (s *GitalyTestServer) InfoRefsReceivePack(in *pb.InfoRefsRequest, stream pb
return stream.Send(response)
}
// TODO replace these empty implementations
func (*GitalyTestServer) PostUploadPack(pb.SmartHTTP_PostUploadPackServer) error { return nil }
func (*GitalyTestServer) PostReceivePack(pb.SmartHTTP_PostReceivePackServer) error { return nil }
func (s *GitalyTestServer) PostReceivePack(stream pb.SmartHTTP_PostReceivePackServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
response := &pb.PostReceivePackResponse{
Data: []byte(req.Repository.GetPath() + req.GlId),
}
if err := stream.Send(response); err != nil {
return err
}
// The body of the request starts in the second message
for {
req, err := stream.Recv()
if err != nil {
if err != io.EOF {
return err
}
break
}
response := &pb.PostReceivePackResponse{
Data: req.GetData(),
}
if err := stream.Send(response); err != nil {
return err
}
}
return nil
}
func (s *GitalyTestServer) PostUploadPack(stream pb.SmartHTTP_PostUploadPackServer) error {
req, err := stream.Recv()
if err != nil {
return err
}
response := &pb.PostUploadPackResponse{
Data: []byte(req.Repository.GetPath()),
}
if err := stream.Send(response); err != nil {
return err
}
// The body of the request starts in the second message
for {
req, err := stream.Recv()
if err != nil {
if err != io.EOF {
return err
}
break
}
response := &pb.PostUploadPackResponse{
Data: req.GetData(),
}
if err := stream.Send(response); err != nil {
return err
}
}
return nil
}
......@@ -71,9 +71,19 @@ func AssertResponseBodyRegexp(t *testing.T, response *httptest.ResponseRecorder,
}
}
func AssertResponseHeader(t *testing.T, w http.ResponseWriter, header string, expected ...string) {
func AssertResponseWriterHeader(t *testing.T, w http.ResponseWriter, header string, expected ...string) {
actual := w.Header()[http.CanonicalHeaderKey(header)]
assertHeaderExists(t, header, actual, expected)
}
func AssertResponseHeader(t *testing.T, w *http.Response, header string, expected ...string) {
actual := w.Header[http.CanonicalHeaderKey(header)]
assertHeaderExists(t, header, actual, expected)
}
func assertHeaderExists(t *testing.T, header string, actual, expected []string) {
if len(expected) != len(actual) {
t.Fatalf("for HTTP request expected to receive the header %q with %+v, got %+v", header, expected, actual)
}
......
......@@ -8,6 +8,7 @@ import (
"io"
"io/ioutil"
"log"
"math/rand"
"net"
"net/http"
"net/http/httptest"
......@@ -27,6 +28,8 @@ import (
"gitlab.com/gitlab-org/gitlab-workhorse/internal/upstream"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
......@@ -38,7 +41,6 @@ const testProject = "group/test"
var checkoutDir = path.Join(scratchDir, "test")
var cacheDir = path.Join(scratchDir, "cache")
var gitalySocketPath = path.Join(scratchDir, "gitaly.sock")
func TestMain(m *testing.M) {
source := "https://gitlab.com/gitlab-org/gitlab-test.git"
......@@ -60,6 +62,8 @@ func TestMain(m *testing.M) {
os.Exit(1)
}
defer gitaly.CloseConnections()
os.Exit(func() int {
defer cleanup()
return m.Run()
......@@ -594,15 +598,19 @@ func TestApiContentTypeBlock(t *testing.T) {
}
func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) {
gitalyServer := startGitalyServer(t)
defer func() {
gitalyServer.Stop()
gitaly.CloseConnections()
}()
apiResponse := gitOkBody(t)
repoPath := apiResponse.RepoPath
gitalyServer, socketPath := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse.GitalySocketPath = socketPath
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
for _, testCase := range []struct {
repoPath string
repository pb.Repository
......@@ -613,7 +621,7 @@ func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) {
func() {
apiResponse.RepoPath = testCase.repoPath
apiResponse.Repository = testCase.repository
apiResponse.GitalySocketPath = gitalySocketPath
apiResponse.GitalySocketPath = socketPath
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
......@@ -639,12 +647,83 @@ func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) {
}
}
func TestPostReceivePackProxiedToGitalySuccessfully(t *testing.T) {
apiResponse := gitOkBody(t)
gitalyServer, socketPath := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse.GitalySocketPath = socketPath
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
resp, err := http.Post(
ws.URL+resource,
"application/x-git-receive-pack-request",
bytes.NewReader(testhelper.GitalyReceivePackResponseMock),
)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
testhelper.AssertResponseHeader(t, resp, "Content-Type", "application/x-git-receive-pack-result")
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if string(responseBody) != apiResponse.RepoPath+apiResponse.GL_ID+string(testhelper.GitalyReceivePackResponseMock) {
t.Errorf("GET %q: Unexpected response", resource)
}
}
func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) {
apiResponse := gitOkBody(t)
gitalyServer, socketPath := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse.GitalySocketPath = socketPath
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
resp, err := http.Post(
ws.URL+resource,
"application/x-git-upload-pack-request",
bytes.NewReader(testhelper.GitalyUploadPackResponseMock),
)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
testhelper.AssertResponseHeader(t, resp, "Content-Type", "application/x-git-upload-pack-result")
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if string(responseBody) != apiResponse.RepoPath+string(testhelper.GitalyUploadPackResponseMock) {
t.Errorf("GET %q: Unexpected response", resource)
}
}
func TestGetInfoRefsHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
gitalyServer := startGitalyServer(t)
defer func() {
gitalyServer.Stop()
gitaly.CloseConnections()
}()
gitalyServer, _ := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse := gitOkBody(t)
apiResponse.GitalySocketPath = ""
......@@ -675,6 +754,68 @@ func TestGetInfoRefsHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
}
}
func TestPostReceivePackHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
gitalyServer, _ := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse := gitOkBody(t)
apiResponse.GitalySocketPath = ""
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-receive-pack"
payload := []byte("This payload should not reach Gitaly")
resp, err := http.Post(ws.URL+resource, "application/x-git-receive-pack-request", bytes.NewReader(payload))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if bytes.Contains(responseBody, payload) {
t.Errorf("GET %q: request should not have been proxied to Gitaly", resource)
}
}
func TestPostUploadPackHandledLocallyDueToEmptyGitalySocketPath(t *testing.T) {
gitalyServer, _ := startGitalyServer(t)
defer gitalyServer.Stop()
apiResponse := gitOkBody(t)
apiResponse.GitalySocketPath = ""
ts := testAuthServer(nil, 200, apiResponse)
defer ts.Close()
ws := startWorkhorseServer(ts.URL)
defer ws.Close()
resource := "/gitlab-org/gitlab-test.git/git-upload-pack"
payload := []byte("This payload should not reach Gitaly")
resp, err := http.Post(ws.URL+resource, "application/x-git-upload-pack-request", bytes.NewReader(payload))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
responseBody, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
if resp.StatusCode != 200 {
t.Errorf("GET %q: expected 200, got %d", resource, resp.StatusCode)
}
if bytes.Contains(responseBody, payload) {
t.Errorf("GET %q: request should not have been proxied to Gitaly", resource)
}
}
func TestAPIFalsePositivesAreProxied(t *testing.T) {
goodResponse := []byte(`<html></html>`)
ts := testhelper.TestServerWithHandler(regexp.MustCompile(`.`), func(w http.ResponseWriter, r *http.Request) {
......@@ -848,9 +989,10 @@ func startWorkhorseServerWithConfig(cfg *config.Config) *httptest.Server {
return httptest.NewServer(u)
}
func startGitalyServer(t *testing.T) *grpc.Server {
func startGitalyServer(t *testing.T) (*grpc.Server, string) {
socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int()))
server := grpc.NewServer()
listener, err := net.Listen("unix", gitalySocketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
t.Fatal(err)
}
......@@ -859,7 +1001,7 @@ func startGitalyServer(t *testing.T) *grpc.Server {
go server.Serve(listener)
return server
return server, socketPath
}
func runOrFail(t *testing.T, cmd *exec.Cmd) {
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment