From 055371ce601f8347882602e8ee9e6315b3013938 Mon Sep 17 00:00:00 2001 From: Ian Chen Date: Fri, 21 Apr 2023 16:45:06 +0800 Subject: [PATCH] bugfix: prevent memory leak caused by time.After (#65) * bugfix: prevent memory leak caused by time.After * fix golangci error * update actions yaml --- .github/workflows/golangci-lint.yml | 9 +++++++-- cmd/pfcpctl/main.go | 1 + cmd/pfcpsim/main.go | 1 + internal/pfcpctl/commands/helpers.go | 7 +------ internal/pfcpctl/commands/services.go | 3 +++ internal/pfcpctl/commands/sessions.go | 3 +++ internal/pfcpsim/helpers.go | 2 ++ internal/pfcpsim/server.go | 10 +++++++++- pkg/pfcpsim/pfcpsim.go | 26 +++++++++++++++++++++----- 9 files changed, 48 insertions(+), 14 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 035977c..bfce99a 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -13,9 +13,14 @@ jobs: name: lint runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2.4.0 + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version-file: go.mod + cache: true + cache-dependency-path: go.sum - name: golangci-lint - uses: golangci/golangci-lint-action@v2.5.2 + uses: golangci/golangci-lint-action@v3 with: version: v1.47.3 working-directory: pkg/pfcpsim diff --git a/cmd/pfcpctl/main.go b/cmd/pfcpctl/main.go index ce575b7..9264095 100644 --- a/cmd/pfcpctl/main.go +++ b/cmd/pfcpctl/main.go @@ -17,6 +17,7 @@ func main() { parser := flags.NewNamedParser(path.Base(os.Args[0]), flags.HelpFlag|flags.PassDoubleDash|flags.PassAfterNonOption) _, err := parser.AddGroup("Global Options", "", &config.GlobalOptions) + if err != nil { panic(err) } diff --git a/cmd/pfcpsim/main.go b/cmd/pfcpsim/main.go index 546aaa0..e8c4775 100644 --- a/cmd/pfcpsim/main.go +++ b/cmd/pfcpsim/main.go @@ -58,6 +58,7 @@ func main() { optHelp := getopt.BoolLong("help", 0, "Help") getopt.Parse() + if *optHelp { getopt.Usage() os.Exit(0) diff --git a/internal/pfcpctl/commands/helpers.go b/internal/pfcpctl/commands/helpers.go index c4f9549..1b736c6 100644 --- a/internal/pfcpctl/commands/helpers.go +++ b/internal/pfcpctl/commands/helpers.go @@ -15,8 +15,7 @@ var conn *grpc.ClientConn func connect() pb.PFCPSimClient { // Create an insecure gRPC Channel - var err error - conn, err = grpc.Dial(config.GlobalConfig.Server, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(config.GlobalConfig.Server, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("Error dialing %v: %v", config.GlobalConfig.Server, err) } @@ -24,10 +23,6 @@ func connect() pb.PFCPSimClient { return pb.NewPFCPSimClient(conn) } -func validateArgs(args *commonArgs) { - -} - func disconnect() { if conn != nil { conn.Close() diff --git a/internal/pfcpctl/commands/services.go b/internal/pfcpctl/commands/services.go index 735b8d4..59f2e8c 100644 --- a/internal/pfcpctl/commands/services.go +++ b/internal/pfcpctl/commands/services.go @@ -30,6 +30,7 @@ func RegisterServiceCommands(parser *flags.Parser) { func (c *configureRemoteAddresses) Execute(args []string) error { client := connect() + defer disconnect() res, err := client.Configure(context.Background(), &pb.ConfigureRequest{ @@ -48,6 +49,7 @@ func (c *configureRemoteAddresses) Execute(args []string) error { func (c *associate) Execute(args []string) error { client := connect() + defer disconnect() res, err := client.Associate(context.Background(), &pb.EmptyRequest{}) @@ -62,6 +64,7 @@ func (c *associate) Execute(args []string) error { func (c *disassociate) Execute(args []string) error { client := connect() + defer disconnect() res, err := client.Disassociate(context.Background(), &pb.EmptyRequest{}) diff --git a/internal/pfcpctl/commands/sessions.go b/internal/pfcpctl/commands/sessions.go index c60b6f6..a0e914e 100644 --- a/internal/pfcpctl/commands/sessions.go +++ b/internal/pfcpctl/commands/sessions.go @@ -66,6 +66,7 @@ func (s *sessionCreate) Execute(args []string) error { } client := connect() + defer disconnect() s.Args.validate() @@ -90,6 +91,7 @@ func (s *sessionCreate) Execute(args []string) error { func (s *sessionModify) Execute(args []string) error { client := connect() + defer disconnect() s.Args.validate() @@ -115,6 +117,7 @@ func (s *sessionModify) Execute(args []string) error { func (s *sessionDelete) Execute(args []string) error { client := connect() + defer disconnect() s.Args.validate() diff --git a/internal/pfcpsim/helpers.go b/internal/pfcpsim/helpers.go index 63e5b85..ea06c3a 100644 --- a/internal/pfcpsim/helpers.go +++ b/internal/pfcpsim/helpers.go @@ -109,6 +109,7 @@ func parseAppFilter(filter string) (string, uint8, uint32, error) { proto, ipNetAddr, portRange, action, precedence := result[0], result[1], result[2], result[3], result[4] var gateStatus uint8 + switch action { case "allow": gateStatus = ie.GateStatusOpen @@ -155,6 +156,7 @@ func parseAppFilter(filter string) (string, uint8, uint32, error) { if lowerPort > upperPort { return "", 0, 0, pfcpsim.NewInvalidFormatError("Port range. Lower port is greater than upper port") } + return fmt.Sprintf(sdfFilterFormatWPort, proto, ipNetAddr, lowerPort, upperPort), gateStatus, precedenceUint, nil } else { return fmt.Sprintf(sdfFilterFormatWOPort, proto, ipNetAddr), gateStatus, precedenceUint, nil diff --git a/internal/pfcpsim/server.go b/internal/pfcpsim/server.go index 010cc5a..f996403 100644 --- a/internal/pfcpsim/server.go +++ b/internal/pfcpsim/server.go @@ -49,6 +49,7 @@ func (P pfcpSimService) Configure(ctx context.Context, request *pb.ConfigureRequ if net.ParseIP(request.UpfN3Address) == nil { errMsg := fmt.Sprintf("Error while parsing UPF N3 address: %v", request.UpfN3Address) log.Error(errMsg) + return &pb.Response{}, status.Error(codes.Aborted, errMsg) } // remotePeerAddress is validated in pfcpsim @@ -74,6 +75,7 @@ func (P pfcpSimService) Associate(ctx context.Context, empty *pb.EmptyRequest) ( if err := connectPFCPSim(); err != nil { errMsg := fmt.Sprintf("Could not connect to remote peer :%v", err) log.Error(errMsg) + return &pb.Response{}, status.Error(codes.Aborted, errMsg) } } @@ -127,6 +129,7 @@ func (P pfcpSimService) CreateSession(ctx context.Context, request *pb.CreateSes if err != nil { errMsg := fmt.Sprintf(" Could not parse Address Pool: %v", err) log.Error(errMsg) + return &pb.Response{}, status.Error(codes.Aborted, errMsg) } @@ -255,6 +258,7 @@ func (P pfcpSimService) CreateSession(ctx context.Context, request *pb.CreateSes if err != nil { return &pb.Response{}, status.Error(codes.Internal, err.Error()) } + insertSession(i, sess) } @@ -280,6 +284,7 @@ func (P pfcpSimService) ModifySession(ctx context.Context, request *pb.ModifySes if len(activeSessions) < count { err := pfcpsim.NewNotEnoughSessionsError() log.Error(err) + return &pb.Response{}, status.Error(codes.Aborted, err.Error()) } @@ -308,7 +313,7 @@ func (P pfcpSimService) ModifySession(ctx context.Context, request *pb.ModifySes teid = 0 // When buffering, TEID = 0. } - for _, _ = range request.AppFilters { + for range request.AppFilters { downlinkFAR := session.NewFARBuilder(). WithID(ID). // Same FARID that was generated in create sessions WithMethod(session.Update). @@ -327,6 +332,7 @@ func (P pfcpSimService) ModifySession(ctx context.Context, request *pb.ModifySes if !ok { errMsg := fmt.Sprintf("Could not retrieve session with index %v", i) log.Error(errMsg) + return &pb.Response{}, status.Error(codes.Internal, errMsg) } @@ -356,6 +362,7 @@ func (P pfcpSimService) DeleteSession(ctx context.Context, request *pb.DeleteSes if len(activeSessions) < count { err := pfcpsim.NewNotEnoughSessionsError() log.Error(err) + return &pb.Response{}, status.Error(codes.Aborted, err.Error()) } @@ -364,6 +371,7 @@ func (P pfcpSimService) DeleteSession(ctx context.Context, request *pb.DeleteSes if !ok { errMsg := "Session was nil. Check baseID" log.Error(errMsg) + return &pb.Response{}, status.Error(codes.Aborted, errMsg) } diff --git a/pkg/pfcpsim/pfcpsim.go b/pkg/pfcpsim/pfcpsim.go index b425685..0c93451 100644 --- a/pkg/pfcpsim/pfcpsim.go +++ b/pkg/pfcpsim/pfcpsim.go @@ -180,11 +180,27 @@ func (c *PFCPClient) PeekNextHeartbeatResponse() (*message.HeartbeatResponse, er // It's a blocking operation, which is timed out after c.responseTimeout period (5 seconds by default). // Use SetPFCPResponseTimeout() to configure a custom timeout. func (c *PFCPClient) PeekNextResponse() (message.Message, error) { - select { - case msg := <-c.recvChan: - return msg, nil - case <-time.After(c.responseTimeout): - return nil, NewTimeoutExpiredError() + var resMsg message.Message + + var err error + + delay := time.NewTimer(c.responseTimeout) + + for { + select { + case msg := <-c.recvChan: + if !delay.Stop() { + <-delay.C + } + + resMsg = msg + case <-delay.C: + if resMsg == nil { + err = NewTimeoutExpiredError() + } + + return resMsg, err + } } }