diff --git a/protocol/chainlib/node_error_handler.go b/protocol/chainlib/node_error_handler.go index ed345cef42..2946ad17d2 100644 --- a/protocol/chainlib/node_error_handler.go +++ b/protocol/chainlib/node_error_handler.go @@ -5,9 +5,9 @@ import ( "fmt" "io" "net" - "os" + "net/url" + "regexp" "strings" - "syscall" "github.com/goccy/go-json" @@ -23,24 +23,64 @@ import ( type genericErrorHandler struct{} func (geh *genericErrorHandler) handleConnectionError(err error) error { - if err == net.ErrWriteToConnected { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Write to connected connection", nil) - } else if err == net.ErrClosed { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Operation on closed connection", nil) - } else if err == io.EOF { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: End of input stream reached", nil) - } else if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Network operation timed out", nil) - } else if _, ok := err.(*net.DNSError); ok { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: DNS resolution failed", nil) - } else if opErr, ok := err.(*net.OpError); ok { - if sysErr, ok := opErr.Err.(*os.SyscallError); ok && sysErr.Err == syscall.ECONNREFUSED { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: Connection refused", nil) + // Generic error message + genericMsg := "Provider Side Failed Sending Message" + + switch { + case err == net.ErrWriteToConnected: + return utils.LavaFormatProduction(genericMsg+", Reason: Write to connected connection", nil) + case err == net.ErrClosed: + return utils.LavaFormatProduction(genericMsg+", Reason: Operation on closed connection", nil) + case err == io.EOF: + return utils.LavaFormatProduction(genericMsg+", Reason: End of input stream reached", nil) + case strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client"): + return utils.LavaFormatProduction(genericMsg+", Reason: misconfigured http endpoint as https", nil) + } + + if opErr, ok := err.(*net.OpError); ok { + switch { + case opErr.Timeout(): + return utils.LavaFormatProduction(genericMsg+", Reason: Network operation timed out", nil) + case strings.Contains(opErr.Error(), "connection refused"): + return utils.LavaFormatProduction(genericMsg+", Reason: Connection refused", nil) + default: + // Handle other OpError cases without exposing specific details + return utils.LavaFormatProduction(genericMsg+", Reason: Network operation error", nil) + } + } + if urlErr, ok := err.(*url.Error); ok { + switch { + case urlErr.Timeout(): + return utils.LavaFormatProduction(genericMsg+", Reason: url.Error issue", nil) + case strings.Contains(urlErr.Error(), "connection refused"): + return utils.LavaFormatProduction(genericMsg+", Reason: Connection refused", nil) } - } else if strings.Contains(err.Error(), "http: server gave HTTP response to HTTPS client") { - return utils.LavaFormatProduction("Provider Side Failed Sending Message, Reason: misconfigured http endpoint as https", nil) } - return nil // do not return here so the caller will return the error inside the data so it reaches the user when it doesn't match any specific cases + + if _, ok := err.(*net.DNSError); ok { + return utils.LavaFormatProduction(genericMsg+", Reason: DNS resolution failed", nil) + } + + // Mask IP addresses and potential secrets in the error message, and check if any secret was found + maskedError, foundSecret := maskSensitiveInfo(err.Error()) + if foundSecret { + // Log or handle the case when a secret was found, if necessary + utils.LavaFormatProduction(genericMsg+maskedError, nil) + } + return nil +} + +func maskSensitiveInfo(errMsg string) (string, bool) { + foundSecret := false + + // Mask IP addresses + ipRegex := regexp.MustCompile(`\b(?:\d{1,3}\.){3}\d{1,3}\b`) + if ipRegex.MatchString(errMsg) { + foundSecret = true + errMsg = ipRegex.ReplaceAllString(errMsg, "[IP_ADDRESS]") + } + + return errMsg, foundSecret } func (geh *genericErrorHandler) handleGenericErrors(ctx context.Context, nodeError error) error { diff --git a/protocol/chainlib/node_error_handler_test.go b/protocol/chainlib/node_error_handler_test.go index 5303564ac5..785c2346fc 100644 --- a/protocol/chainlib/node_error_handler_test.go +++ b/protocol/chainlib/node_error_handler_test.go @@ -1,13 +1,16 @@ package chainlib import ( + "bytes" "context" "errors" "io" "net" + "net/http" "os" "syscall" "testing" + "time" "github.com/lavanet/lava/v3/utils" "github.com/stretchr/testify/require" @@ -78,3 +81,22 @@ func TestNodeErrorHandlerGenericErrors(t *testing.T) { err = neh.handleGenericErrors(ctx, errors.New("dummy error")) require.Equal(t, err, nil) } + +func TestNodeErrorHandlerTimeout(t *testing.T) { + httpClient := &http.Client{ + Timeout: 5 * time.Minute, // we are doing a timeout by request + } + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + msgBuffer := bytes.NewBuffer([]byte{1, 2, 3}) + req, err := http.NewRequestWithContext(ctx, "test", "http://0.0.0.0:6789", msgBuffer) + require.NoError(t, err) + _, err = httpClient.Do(req) + require.Error(t, err) + utils.LavaFormatDebug(err.Error()) + genericHandler := genericErrorHandler{} + bctx := context.Background() + ret := genericHandler.handleGenericErrors(bctx, err) + utils.LavaFormatDebug(ret.Error()) + require.NotContains(t, ret.Error(), "http://0.0.0.0:6789") +} diff --git a/scripts/automation_scripts/pure_proxy.py b/scripts/automation_scripts/pure_proxy.py new file mode 100644 index 0000000000..4472727e23 --- /dev/null +++ b/scripts/automation_scripts/pure_proxy.py @@ -0,0 +1,81 @@ +import asyncio +import aiohttp +from aiohttp import web +from functools import partial + +port_url_map = { + 5555: "http://localhost:1317", # Replace with actual target URLs + 5556: "http://localhost:26657", +} + +async def proxy_handler(request, server_port): + target_url = port_url_map.get(server_port) + + if not target_url: + return web.Response(text=f"No target URL configured for port {server_port}", status=404) + + path = request.rel_url.path + query_string = request.rel_url.query_string + url = f"{target_url}{path}" + if query_string: + url += f"?{query_string}" + + print(f"Proxying request to: {url}") # Debug print + print(f"Request headers: {request.headers}") # Debug print + + try: + async with aiohttp.ClientSession() as session: + method = request.method + headers = {k: v for k, v in request.headers.items() if k.lower() not in ('host', 'content-length')} + data = await request.read() + + async with session.request(method, url, headers=headers, data=data, allow_redirects=False) as resp: + print(f"Response status: {resp.status}") # Debug print + print(f"Response headers: {resp.headers}") # Debug print + + response = web.StreamResponse(status=resp.status, headers=resp.headers) + await response.prepare(request) + + async for chunk, _ in resp.content.iter_chunks(): + await response.write(chunk) + print(f"Wrote chunk of size: {len(chunk)}") # Debug print + + await response.write_eof() + return response + + except Exception as e: + print(f"Error proxying request: {str(e)}") + return web.Response(text=f"Error proxying request: {str(e)}", status=500) + +def create_app(port): + app = web.Application() + handler = partial(proxy_handler, server_port=port) + app.router.add_route('*', '/{path:.*}', handler) + return app + +async def run_app(app, port): + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, '0.0.0.0', port) + await site.start() + print(f"Server started on port {port}") + return runner + +async def main(): + runners = [] + for port in port_url_map.keys(): + app = create_app(port) + runner = await run_app(app, port) + runners.append(runner) + + print("Proxy server is running. Press Ctrl+C to stop.") + try: + await asyncio.Event().wait() + except KeyboardInterrupt: + print("Stopping server...") + finally: + for runner in runners: + await runner.cleanup() + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/scripts/pre_setups/init_lava_only_with_node_with_python_proxy.sh b/scripts/pre_setups/init_lava_only_with_node_with_python_proxy.sh new file mode 100755 index 0000000000..f6e4dabd1a --- /dev/null +++ b/scripts/pre_setups/init_lava_only_with_node_with_python_proxy.sh @@ -0,0 +1,77 @@ +#!/bin/bash +__dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "$__dir"/../useful_commands.sh +. "${__dir}"/../vars/variables.sh + +LOGS_DIR=${__dir}/../../testutil/debugging/logs +mkdir -p $LOGS_DIR +rm $LOGS_DIR/*.log + +killall screen +screen -wipe + +screen -d -m -S python_proxy bash -c "python3 ./scripts/automation_scripts/pure_proxy.py" + +echo "[Test Setup] installing all binaries" +make install-all + +echo "[Test Setup] setting up a new lava node" +screen -d -m -S node bash -c "./scripts/start_env_dev.sh" +screen -ls +echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enough increase timeout)" +sleep 5 +wait_for_lava_node_to_start + +GASPRICE="0.00002ulava" +lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE & +wait_next_block +wait_next_block +lavad tx gov vote 1 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +sleep 4 + +# Plans proposal +lavad tx gov submit-legacy-proposal plans-add ./cookbook/plans/test_plans/default.json,./cookbook/plans/test_plans/temporary-add.json -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +wait_next_block +wait_next_block +lavad tx gov vote 2 yes -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + +sleep 4 + +CLIENTSTAKE="500000000000ulava" +PROVIDERSTAKE="500000000000ulava" + +PROVIDER1_LISTENER="127.0.0.1:2220" +PROVIDER2_LISTENER="127.0.0.1:2221" + +lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE +wait_next_block +# lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + +lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER2_LISTENER,1" 1 $(operator_address) -y --from servicer2 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE + +sleep_until_next_epoch +wait_next_block + +# screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ +# $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ +# $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ +# $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ +# $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 + +screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider \ +$PROVIDER2_LISTENER LAV1 rest 'http://localhost:5555' \ +$PROVIDER2_LISTENER LAV1 tendermintrpc 'http://localhost:5556,http://localhost:5556' \ +$PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' \ +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer2 --chain-id lava --metrics-listen-address ":7775" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25 + +wait_next_block + +screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ +127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 + + + + +echo "--- setting up screens done ---" +screen -ls \ No newline at end of file