Skip to content

Commit

Permalink
🎨 fix lints for rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
cairoeth committed Mar 24, 2024
1 parent 3b183be commit 4a22ae3
Show file tree
Hide file tree
Showing 25 changed files with 527 additions and 532 deletions.
2 changes: 1 addition & 1 deletion rpc/cmd/mockbackend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func main() {
port := 8090
http.HandleFunc("/", testutils.RpcBackendHandler)
http.HandleFunc("/", testutils.RPCBackendHandler)
fmt.Printf("rpc backend listening on localhost:%d\n", port)
http.ListenAndServe(fmt.Sprintf("localhost:%d", port), nil)
}
27 changes: 11 additions & 16 deletions rpc/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,28 @@ var (

// defaults
defaultDebug = os.Getenv("DEBUG") == "1"
defaultLogJSON = os.Getenv("LOG_JSON") == "1"
defaultListenAddress = "127.0.0.1:9000"
defaultDrainAddress = "127.0.0.1:9001"
defaultDrainSeconds = 60
defaultProxyUrl = "http://127.0.0.1:8545"
defaultProxyURL = "http://127.0.0.1:8545"
defaultProxyTimeoutSeconds = 10
defaultRelayUrl = "http://localhost:8080"
defaultRedisUrl = "localhost:6379"
defaultServiceName = os.Getenv("SERVICE_NAME")
defaultRelayURL = "http://localhost:8080"
defaultRedisURL = "localhost:6379"
defaultFetchInfoIntervalSeconds = 600

// cli flags
versionPtr = flag.Bool("version", false, "just print the program version")
listenAddress = flag.String("listen", getEnvAsStrOrDefault("LISTEN_ADDR", defaultListenAddress), "Listen address")
drainAddress = flag.String("drain", getEnvAsStrOrDefault("DRAIN_ADDR", defaultDrainAddress), "Drain address")
drainSeconds = flag.Int("drainSeconds", getEnvAsIntOrDefault("DRAIN_SECONDS", defaultDrainSeconds), "seconds to wait for graceful shutdown")
fetchIntervalSeconds = flag.Int("fetchIntervalSeconds", getEnvAsIntOrDefault("FETCH_INFO_INTERVAL_SECONDS", defaultFetchInfoIntervalSeconds), "seconds between builder info fetches")
builderInfoSource = flag.String("builderInfoSource", getEnvAsStrOrDefault("BUILDER_INFO_SOURCE", ""), "URL for json source of actual builder info")
proxyUrl = flag.String("proxy", getEnvAsStrOrDefault("PROXY_URL", defaultProxyUrl), "URL for default JSON-RPC proxy target (eth node, Infura, etc.)")
proxyURL = flag.String("proxy", getEnvAsStrOrDefault("PROXY_URL", defaultProxyURL), "URL for default JSON-RPC proxy target (eth node, Infura, etc.)")
proxyTimeoutSeconds = flag.Int("proxyTimeoutSeconds", getEnvAsIntOrDefault("PROXY_TIMEOUT_SECONDS", defaultProxyTimeoutSeconds), "proxy client timeout in seconds")
redisUrl = flag.String("redis", getEnvAsStrOrDefault("REDIS_URL", defaultRedisUrl), "URL for Redis (use 'dev' to use integrated in-memory redis)")
relayUrl = flag.String("relayUrl", getEnvAsStrOrDefault("RELAY_URL", defaultRelayUrl), "URL for preconf rpc")
redisURL = flag.String("redis", getEnvAsStrOrDefault("REDIS_URL", defaultRedisURL), "URL for Redis (use 'dev' to use integrated in-memory redis)")
relayURL = flag.String("relayURL", getEnvAsStrOrDefault("RELAY_URL", defaultRelayURL), "URL for preconf rpc")
relaySigningKey = flag.String("signingKey", os.Getenv("RELAY_SIGNING_KEY"), "Signing key for relay requests")
psqlDsn = flag.String("psql", os.Getenv("POSTGRES_DSN"), "Postgres DSN")
debugPtr = flag.Bool("debug", defaultDebug, "print debug output")
logJSONPtr = flag.Bool("logJSON", defaultLogJSON, "log in JSON")
serviceName = flag.String("serviceName", defaultServiceName, "name of the service which will be used in the logs")
)

func main() {
Expand Down Expand Up @@ -114,25 +109,25 @@ func main() {
db = database.NewPostgresStore(*psqlDsn)
}
// Start the endpoint
s, err := server.NewRpcEndPointServer(server.Configuration{
s, err := server.NewRPCEndPointServer(server.Configuration{
DB: db,
DrainAddress: *drainAddress,
DrainSeconds: *drainSeconds,
ListenAddress: *listenAddress,
Logger: logger,
ProxyTimeoutSeconds: *proxyTimeoutSeconds,
ProxyUrl: *proxyUrl,
RedisUrl: *redisUrl,
ProxyURL: *proxyURL,
RedisURL: *redisURL,
RelaySigningKey: key,
RelayUrl: *relayUrl,
RelayUrl: *relayURL,
Version: version,
BuilderInfoSource: *builderInfoSource,
FetchInfoInterval: *fetchIntervalSeconds,
})
if err != nil {
logger.Error("Server init error", zap.Error(err))
}
logger.Info("Starting rpc-endpoint...", zap.String("relayUrl", *relayUrl), zap.String("proxyUrl", *proxyUrl))
logger.Info("Starting rpc-endpoint...", zap.String("relayURL", *relayURL), zap.String("proxyURL", *proxyURL))
s.Start()
}

Expand Down
2 changes: 1 addition & 1 deletion rpc/database/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewMemStore() *memStore {
func (m *memStore) SaveRequestEntry(entry RequestEntry) error {
m.mutex.Lock()
defer m.mutex.Unlock()
m.Requests[entry.Id] = entry
m.Requests[entry.ID] = entry
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions rpc/database/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,29 @@ import (

// RequestEntry to store each request
type RequestEntry struct {
Id uuid.UUID `db:"id"`
ID uuid.UUID `db:"id"`
ReceivedAt time.Time `db:"received_at"`
InsertedAt time.Time `db:"inserted_at"`
RequestDurationMs int64 `db:"request_duration_ms"`
IsBatchRequest bool `db:"is_batch_request"`
NumRequestInBatch int `db:"num_request_in_batch"`
HttpMethod string `db:"http_method"`
HttpUrl string `db:"http_url"`
HttpQueryParam string `db:"http_query_param"`
HttpResponseStatus int `db:"http_response_status"`
HTTPMethod string `db:"http_method"`
HTTPURL string `db:"http_url"`
HTTPQueryParam string `db:"http_query_param"`
HTTPResponseStatus int `db:"http_response_status"`
Origin string `db:"origin"`
Host string `db:"host"`
Error string `db:"error"`
}

// EthSendRawTxEntry to store each eth_sendRawTransaction calls
type EthSendRawTxEntry struct {
Id uuid.UUID `db:"id"`
ID uuid.UUID `db:"id"`
RequestId uuid.UUID `db:"request_id"` // id from RequestEntry table
InsertedAt time.Time `db:"inserted_at"`
IsOnOafcList bool `db:"is_on_oafc_list"`
IsWhiteHatBundleCollection bool `db:"is_white_hat_bundle_collection"`
WhiteHatBundleId string `db:"white_hat_bundle_id"`
WhiteHatBundleID string `db:"white_hat_bundle_id"`
IsCancelTx bool `db:"is_cancel_tx"`
NeedsFrontRunningProtection bool `db:"needs_front_running_protection"`
WasSentToRelay bool `db:"was_sent_to_relay"`
Expand Down
4 changes: 2 additions & 2 deletions rpc/server/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type Configuration struct {
ListenAddress string
Logger *zap.Logger
ProxyTimeoutSeconds int
ProxyUrl string
RedisUrl string
ProxyURL string
RedisURL string
RelaySigningKey *ecdsa.PrivateKey
RelayUrl string
Version string
Expand Down
26 changes: 13 additions & 13 deletions rpc/server/redisstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ var (
RedisExpirySenderMaxNonce = 2 * time.Hour
)

// Enable lookup of bundle txs by bundleId
// Enable lookup of bundle txs by bundleID
var (
RedisPrefixWhitehatBundleTransactions = RedisPrefix + "tx-for-whitehat-bundle:"
RedisExpiryWhitehatBundleTransactions = 24 * time.Hour // 1 day
)

// Enable lookup of bundle txs by bundleId
// Enable lookup of bundle txs by bundleID
var (
RedisPrefixBlockedTxHash = RedisPrefix + "blocked-tx-hash:"
RedisExpiryBlockedTxHash = 24 * time.Hour // 1 day
Expand Down Expand Up @@ -79,8 +79,8 @@ func RedisKeySenderMaxNonce(txFrom string) string {
return RedisPrefixSenderMaxNonce + strings.ToLower(txFrom)
}

func RedisKeyWhitehatBundleTransactions(bundleId string) string {
return RedisPrefixWhitehatBundleTransactions + strings.ToLower(bundleId)
func RedisKeyWhitehatBundleTransactions(bundleID string) string {
return RedisPrefixWhitehatBundleTransactions + strings.ToLower(bundleID)
}

func RedisKeyBlockedTxHash(txHash string) string {
Expand All @@ -95,9 +95,9 @@ type RedisState struct {
RedisClient *redis.Client
}

func NewRedisState(redisUrl string) (*RedisState, error) {
func NewRedisState(redisURL string) (*RedisState, error) {
// Setup redis client and check connection
redisClient := redis.NewClient(&redis.Options{Addr: redisUrl})
redisClient := redis.NewClient(&redis.Options{Addr: redisURL})

// Try to get a key to see if there's an error with the connection
if err := redisClient.Get(context.Background(), "somekey").Err(); err != nil && err != redis.Nil {
Expand Down Expand Up @@ -203,11 +203,11 @@ func (s *RedisState) GetSenderOfTxHash(txHash string) (txSender string, found bo
}

// Enable lookup of tx bundles by bundle ID
func (s *RedisState) AddTxToWhitehatBundle(bundleId, signedTx string) error {
key := RedisKeyWhitehatBundleTransactions(bundleId)
func (s *RedisState) AddTxToWhitehatBundle(bundleID, signedTx string) error {
key := RedisKeyWhitehatBundleTransactions(bundleID)

// Check if item already exists
txs, err := s.GetWhitehatBundleTx(bundleId)
txs, err := s.GetWhitehatBundleTx(bundleID)
if err == nil {
for _, tx := range txs {
if signedTx == tx {
Expand All @@ -233,13 +233,13 @@ func (s *RedisState) AddTxToWhitehatBundle(bundleId, signedTx string) error {
return err
}

func (s *RedisState) GetWhitehatBundleTx(bundleId string) ([]string, error) {
key := RedisKeyWhitehatBundleTransactions(bundleId)
func (s *RedisState) GetWhitehatBundleTx(bundleID string) ([]string, error) {
key := RedisKeyWhitehatBundleTransactions(bundleID)
return s.RedisClient.LRange(context.Background(), key, 0, -1).Result()
}

func (s *RedisState) DelWhitehatBundleTx(bundleId string) error {
key := RedisKeyWhitehatBundleTransactions(bundleId)
func (s *RedisState) DelWhitehatBundleTx(bundleID string) error {
key := RedisKeyWhitehatBundleTransactions(bundleID)
return s.RedisClient.Del(context.Background(), key).Err()
}

Expand Down
18 changes: 9 additions & 9 deletions rpc/server/redisstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,38 +201,38 @@ func TestSenderMaxNonce(t *testing.T) {

func TestWhitehatTx(t *testing.T) {
resetRedis()
bundleId := "123"
bundleID := "123"

// get (empty)
txs, err := redisState.GetWhitehatBundleTx(bundleId)
txs, err := redisState.GetWhitehatBundleTx(bundleID)
require.Nil(t, err, err)
require.Equal(t, 0, len(txs))

// add #1
tx1 := "0xa12345"
tx2 := "0xb123456"
err = redisState.AddTxToWhitehatBundle(bundleId, tx1)
err = redisState.AddTxToWhitehatBundle(bundleID, tx1)
require.Nil(t, err, err)

txs, err = redisState.GetWhitehatBundleTx(bundleId)
txs, err = redisState.GetWhitehatBundleTx(bundleID)
require.Nil(t, err, err)
require.Equal(t, 1, len(txs))

err = redisState.AddTxToWhitehatBundle(bundleId, tx1)
err = redisState.AddTxToWhitehatBundle(bundleID, tx1)
require.Nil(t, err, err)
err = redisState.AddTxToWhitehatBundle(bundleId, tx2)
err = redisState.AddTxToWhitehatBundle(bundleID, tx2)
require.Nil(t, err, err)

txs, err = redisState.GetWhitehatBundleTx(bundleId)
txs, err = redisState.GetWhitehatBundleTx(bundleID)
require.Nil(t, err, err)
require.Equal(t, 2, len(txs))
require.Equal(t, tx2, txs[0])
require.Equal(t, tx1, txs[1])

err = redisState.DelWhitehatBundleTx(bundleId)
err = redisState.DelWhitehatBundleTx(bundleID)
require.Nil(t, err, err)

txs, err = redisState.GetWhitehatBundleTx(bundleId)
txs, err = redisState.GetWhitehatBundleTx(bundleID)
require.Nil(t, err, err)
require.Equal(t, 0, len(txs))
}
Expand Down
46 changes: 23 additions & 23 deletions rpc/server/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@ import (
)

// RPC request handler for a single/ batch JSON-RPC request
type RpcRequestHandler struct {
type RPCRequestHandler struct {
respw *http.ResponseWriter
req *http.Request
logger *zap.Logger
timeStarted time.Time
defaultProxyUrl string
defaultProxyURL string
proxyTimeoutSeconds int
relaySigningKey *ecdsa.PrivateKey
relayUrl string
relayURL string
uid uuid.UUID
requestRecord *requestRecord
builderNames []string
chainID []byte
}

func NewRpcRequestHandler(logger *zap.Logger, respw *http.ResponseWriter, req *http.Request, proxyUrl string, proxyTimeoutSeconds int, relaySigningKey *ecdsa.PrivateKey, relayUrl string, db database.Store, builderNames []string, chainID []byte) *RpcRequestHandler {
return &RpcRequestHandler{
func NewRPCRequestHandler(logger *zap.Logger, respw *http.ResponseWriter, req *http.Request, proxyURL string, proxyTimeoutSeconds int, relaySigningKey *ecdsa.PrivateKey, relayURL string, db database.Store, builderNames []string, chainID []byte) *RPCRequestHandler {
return &RPCRequestHandler{
logger: logger,
respw: respw,
req: req,
timeStarted: Now(),
defaultProxyUrl: proxyUrl,
defaultProxyURL: proxyURL,
proxyTimeoutSeconds: proxyTimeoutSeconds,
relaySigningKey: relaySigningKey,
relayUrl: relayUrl,
relayURL: relayURL,
uid: uuid.New(),
requestRecord: NewRequestRecord(db),
builderNames: builderNames,
Expand All @@ -47,27 +47,27 @@ func NewRpcRequestHandler(logger *zap.Logger, respw *http.ResponseWriter, req *h
}

// nolint
func (r *RpcRequestHandler) process() {
func (r *RPCRequestHandler) process() {
// r.logger = r.logger.With("uid", r.uid)
r.logger.Info("[process] POST request received")

defer r.finishRequest()
r.requestRecord.requestEntry.ReceivedAt = r.timeStarted
r.requestRecord.requestEntry.Id = r.uid
r.requestRecord.requestEntry.ID = r.uid
r.requestRecord.UpdateRequestEntry(r.req, http.StatusOK, "")

whitehatBundleId := r.req.URL.Query().Get("bundle")
isWhitehatBundleCollection := whitehatBundleId != ""
whitehatBundleID := r.req.URL.Query().Get("bundle")
isWhitehatBundleCollection := whitehatBundleID != ""

origin := r.req.Header.Get("Origin")
referer := r.req.Header.Get("Referer")

// If users specify a proxy url in their rpc endpoint they can have their requests proxied to that endpoint instead of Infura
// e.g. https://rpc.flashbots.net?url=http://RPC-ENDPOINT.COM
customProxyUrl, ok := r.req.URL.Query()["url"]
if ok && len(customProxyUrl[0]) > 1 {
r.defaultProxyUrl = customProxyUrl[0]
r.logger.Info("[process] Using custom url", zap.String("url", r.defaultProxyUrl))
customProxyURL, ok := r.req.URL.Query()["url"]
if ok && len(customProxyURL[0]) > 1 {
r.defaultProxyURL = customProxyURL[0]
r.logger.Info("[process] Using custom url", zap.String("url", r.defaultProxyURL))
}

// Decode request JSON RPC
Expand All @@ -87,12 +87,12 @@ func (r *RpcRequestHandler) process() {
}

// create rpc proxy client for making proxy request
client := NewRPCProxyClient(r.logger, r.defaultProxyUrl, r.proxyTimeoutSeconds)
client := NewRPCProxyClient(r.logger, r.defaultProxyURL, r.proxyTimeoutSeconds)

r.requestRecord.UpdateRequestEntry(r.req, http.StatusOK, "") // Data analytics

// Parse JSON RPC payload
var jsonReq *types.JsonRpcRequest
var jsonReq *types.JSONRPCRequest
if err = json.Unmarshal(body, &jsonReq); err != nil {
r.logger.Warn("[process] Parse payload", zap.Error(err))
(*r.respw).WriteHeader(http.StatusBadRequest)
Expand All @@ -104,27 +104,27 @@ func (r *RpcRequestHandler) process() {
if err != nil {
r.logger.Warn("[process] Invalid auction preference", zap.Error(err))
res := AuctionPreferenceErrorToJSONRPCResponse(jsonReq, err)
r._writeRpcResponse(res)
r._writeRPCResponse(res)
return
}
// Process single request
r.processRequest(client, jsonReq, origin, referer, isWhitehatBundleCollection, r.defaultProxyUrl, urlParams)
r.processRequest(client, jsonReq, origin, referer, isWhitehatBundleCollection, r.defaultProxyURL, urlParams)
}

// processRequest handles single request
func (r *RpcRequestHandler) processRequest(client RPCProxyClient, jsonReq *types.JsonRpcRequest, origin, referer string, isWhitehatBundleCollection bool, whitehatBundleId string, urlParams URLParameters) {
func (r *RPCRequestHandler) processRequest(client RPCProxyClient, jsonReq *types.JSONRPCRequest, origin, referer string, isWhitehatBundleCollection bool, whitehatBundleID string, urlParams URLParameters) {
var entry *database.EthSendRawTxEntry
if jsonReq.Method == "eth_sendRawTransaction" {
entry = r.requestRecord.AddEthSendRawTxEntry(uuid.New())
}
// Handle single request
rpcReq := NewRpcRequest(r.logger, client, jsonReq, r.relaySigningKey, r.relayUrl, origin, referer, isWhitehatBundleCollection, whitehatBundleId, entry, urlParams, r.chainID)
rpcReq := NewRPCRequest(r.logger, client, jsonReq, r.relaySigningKey, r.relayURL, origin, referer, isWhitehatBundleCollection, whitehatBundleID, entry, urlParams, r.chainID)
res := rpcReq.ProcessRequest()
// Write response
r._writeRpcResponse(res)
r._writeRPCResponse(res)
}

func (r *RpcRequestHandler) finishRequest() {
func (r *RPCRequestHandler) finishRequest() {
reqDuration := time.Since(r.timeStarted) // At end of request, log the time it needed
r.requestRecord.requestEntry.RequestDurationMs = reqDuration.Milliseconds()
go func() {
Expand Down
Loading

0 comments on commit 4a22ae3

Please sign in to comment.