Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: E2E PoC #4

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"

"github.com/castai/cloud-proxy/internal/castai/dummy"
"github.com/castai/cloud-proxy/internal/gcpauth"
Expand All @@ -21,12 +26,16 @@ const (

projectID = "engineering-test-353509"
location = "europe-north1-a"
testCluster = "lachezar-2708"
testCluster = "lachezar-2908"

grpcmothership = "grpc-lachezarts.localenv.cast.ai:443"
apikeyMothership = "764e828ac1267f16a09f20590902ec6986d479b39e6db080da44af14ee7fbe4e"
)

var (
runSanityTests = flag.Bool("sanity-checks", false, "run sanity checks that validate auth loading and basic executor function")
runMockCastTest = flag.Bool("mockcast", true, "run a test using a mock Cast.AI server")
runRealCastTest = flag.Bool("realcast", false, "run a test using a real Cast.AI mothership")
)

func main() {
Expand Down Expand Up @@ -67,7 +76,40 @@ func main() {
src := gcpauth.GCPCredentialsSource{}
executor := proxy.NewExecutor(src, http.DefaultClient)
client := proxy.NewClient(executor, loggerClientProxy)
client.Run(conn)
client.Run(context.Background(), conn)
}()
}

if runRealCastTest != nil && *runRealCastTest {
log.Println("run realcast tests is true, starting with connection to mothership ")

go func() {
loggerClientProxy := log.New(os.Stderr, "[CLUSTER PROXY] ", log.LstdFlags)
loggerClientProxy.Println("Starting proxy client")
tls := credentials.NewTLS(nil)
conn, err := grpc.Dial(grpcmothership, grpc.WithTransportCredentials(tls),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
}))
if err != nil {
loggerClientProxy.Panicf("Failed to connect to server: %v", err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
loggerClientProxy.Panicf("Failed to close gRPC connection: %v", err)
}
}(conn)

ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(
"authorization", fmt.Sprintf("Token %s", apikeyMothership),
))

src := gcpauth.GCPCredentialsSource{}
executor := proxy.NewExecutor(src, http.DefaultClient)
client := proxy.NewClient(executor, loggerClientProxy)
client.Run(ctx, conn)
}()
}

Expand Down
3 changes: 2 additions & 1 deletion dummy_deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ spec:
- "castai-cloud-proxy"
args:
- "-sanity-checks=false"
- "-mockcast=true"
- "-mockcast=false"
- "-realcast=true"
---
apiVersion: v1
kind: ServiceAccount
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ go 1.22

require (
cloud.google.com/go/container v1.39.0
github.com/envoyproxy/protoc-gen-validate v1.0.4
github.com/google/uuid v1.6.0
github.com/googleapis/gax-go/v2 v2.13.0
golang.org/x/oauth2 v0.22.0
google.golang.org/api v0.193.0
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
)
Expand All @@ -34,6 +36,5 @@ require (
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down
33 changes: 20 additions & 13 deletions internal/castai/dummy/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@ import (
"log"
"sync"

"github.com/google/uuid"

"github.com/castai/cloud-proxy/internal/castai/proto"
)

type Dispatcher struct {
pendingRequests map[string]chan *proto.HttpResponse
pendingRequests map[string]chan *proto.HTTPResponse
locker sync.Mutex

proxyRequestChan chan<- *proto.HttpRequest
proxyResponseChan <-chan *proto.HttpResponse
proxyRequestChan chan<- *proto.StreamCloudProxyResponse
proxyResponseChan <-chan *proto.StreamCloudProxyRequest

logger *log.Logger
}

func NewDispatcher(requestChan chan<- *proto.HttpRequest, responseChan <-chan *proto.HttpResponse, logger *log.Logger) *Dispatcher {
func NewDispatcher(requestChan chan<- *proto.StreamCloudProxyResponse, responseChan <-chan *proto.StreamCloudProxyRequest, logger *log.Logger) *Dispatcher {
return &Dispatcher{
pendingRequests: make(map[string]chan *proto.HttpResponse),
pendingRequests: make(map[string]chan *proto.HTTPResponse),
locker: sync.Mutex{},
proxyRequestChan: requestChan,
proxyResponseChan: responseChan,
Expand All @@ -32,29 +34,34 @@ func (d *Dispatcher) Run() {
d.logger.Println("starting response returning loop")
for {
for resp := range d.proxyResponseChan {
waiter := d.findWaiterForResponse(resp.RequestID)
waiter <- resp
waiter := d.findWaiterForResponse(resp.GetMessageId())
waiter <- resp.GetHttpResponse()
d.logger.Println("Sent a response back to caller")
}
}
}()
}

func (d *Dispatcher) SendRequest(req *proto.HttpRequest) (<-chan *proto.HttpResponse, error) {
waiter := d.addRequestToWaitingList(req.RequestID)
d.proxyRequestChan <- req
func (d *Dispatcher) SendRequest(req *proto.HTTPRequest) (<-chan *proto.HTTPResponse, error) {
requestID := uuid.New().String()

waiter := d.addRequestToWaitingList(requestID)
d.proxyRequestChan <- &proto.StreamCloudProxyResponse{
MessageId: requestID,
HttpRequest: req,
}
return waiter, nil
}

func (d *Dispatcher) addRequestToWaitingList(requestID string) <-chan *proto.HttpResponse {
waiter := make(chan *proto.HttpResponse, 1)
func (d *Dispatcher) addRequestToWaitingList(requestID string) <-chan *proto.HTTPResponse {
waiter := make(chan *proto.HTTPResponse, 1)
d.locker.Lock()
d.pendingRequests[requestID] = waiter
d.locker.Unlock()
return waiter
}

func (d *Dispatcher) findWaiterForResponse(requestID string) chan *proto.HttpResponse {
func (d *Dispatcher) findWaiterForResponse(requestID string) chan *proto.HTTPResponse {
d.locker.Lock()
val, ok := d.pendingRequests[requestID]
if !ok {
Expand Down
16 changes: 8 additions & 8 deletions internal/castai/dummy/mock_cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type MockCast struct {
func (mc *MockCast) Run() error {
logger := log.New(os.Stderr, "[CAST-MOCK] ", log.LstdFlags)

requestChan, respChan := make(chan *proto.HttpRequest), make(chan *proto.HttpResponse)
requestChan, respChan := make(chan *proto.StreamCloudProxyResponse), make(chan *proto.StreamCloudProxyRequest)

// Start the mock server
listener, err := net.Listen("tcp", ":50051")
Expand All @@ -34,7 +34,7 @@ func (mc *MockCast) Run() error {
}

grpcServer := grpc.NewServer()
proto.RegisterGCPProxyServerServer(grpcServer, NewMockCastServer(requestChan, respChan, logger))
proto.RegisterCloudProxyAPIServer(grpcServer, NewMockCastServer(requestChan, respChan, logger))

dispatcher := NewDispatcher(requestChan, respChan, logger)

Expand Down Expand Up @@ -68,23 +68,23 @@ func (mc *MockCast) Run() error {
}

type MockCastServer struct {
proto.UnimplementedGCPProxyServerServer
proto.UnimplementedCloudProxyAPIServer

requestChan <-chan *proto.HttpRequest
responseChan chan<- *proto.HttpResponse
requestChan <-chan *proto.StreamCloudProxyResponse
responseChan chan<- *proto.StreamCloudProxyRequest

logger *log.Logger
}

func NewMockCastServer(requestChan <-chan *proto.HttpRequest, responseChan chan<- *proto.HttpResponse, logger *log.Logger) *MockCastServer {
func NewMockCastServer(requestChan <-chan *proto.StreamCloudProxyResponse, responseChan chan<- *proto.StreamCloudProxyRequest, logger *log.Logger) *MockCastServer {
return &MockCastServer{
requestChan: requestChan,
responseChan: responseChan,
logger: logger,
}
}

func (msrv *MockCastServer) Proxy(stream proto.GCPProxyServer_ProxyServer) error {
func (msrv *MockCastServer) StreamCloudProxy(stream proto.CloudProxyAPI_StreamCloudProxyServer) error {
msrv.logger.Println("Received a proxy connection from client")

var eg errgroup.Group
Expand Down Expand Up @@ -117,7 +117,7 @@ func (msrv *MockCastServer) Proxy(stream proto.GCPProxyServer_ProxyServer) error
return err
}

msrv.logger.Printf("Got a response from client: %v, %v\n", in.RequestID, in.Status)
msrv.logger.Printf("Got a response from client: %v, %v\n", in.GetMessageId(), in.GetHttpResponse().GetStatus())
msrv.responseChan <- in
}
})
Expand Down
29 changes: 21 additions & 8 deletions internal/castai/dummy/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,21 @@ func (p *HttpOverGrpcRoundTripper) RoundTrip(request *http.Request) (*http.Respo
for h, v := range request.Header {
headers[h] = strings.Join(v, ",")
}
protoReq := &proto.HttpRequest{
RequestID: requestID,
Method: request.Method,
Url: request.URL.String(),
Headers: headers,
protoReq := &proto.HTTPRequest{
Method: request.Method,
Path: request.URL.String(),
Headers: func() map[string]*proto.HeaderValue {
result := make(map[string]*proto.HeaderValue)
for h, v := range request.Header {
result[h] = &proto.HeaderValue{
Value: make([]string, 0, len(v)),
}
for i := range v {
result[h].Value = append(result[h].Value, v[i])
}
}
return result
}(),
Body: func() []byte {
if request.Body == nil {
return []byte{}
Expand All @@ -56,12 +66,15 @@ func (p *HttpOverGrpcRoundTripper) RoundTrip(request *http.Request) (*http.Respo

// Convert to response
resp := &http.Response{
Status: http.StatusText(int(response.Status)),
StatusCode: int(response.Status),
//Status: http.StatusText(int(response.Status)),
StatusCode: int(response.StatusCode),
Status: response.Status,
Header: func() http.Header {
headers := make(http.Header)
for key, value := range response.Headers {
headers[key] = strings.Split(value, ",")
for _, hv := range value.GetValue() {
headers[key] = append(headers[key], hv)
}
}
return headers
}(),
Expand Down
Loading