Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Degradation in Long-Running Spanner Emulator Sessions #159

Open
kt81 opened this issue Feb 20, 2024 · 3 comments
Open

Performance Degradation in Long-Running Spanner Emulator Sessions #159

kt81 opened this issue Feb 20, 2024 · 3 comments

Comments

@kt81
Copy link

kt81 commented Feb 20, 2024

I've been utilizing the Spanner Emulator for testing purposes within my environment, paired with PHPUnit.
The feature set has been performing exceptionally well.
However, I've noticed a significant slowdown during prolonged testing sessions. Specifically, the slowdown appears to coincide with the creation of new connections.

For reference, please see the related issue #72 and the reproduction project at https://github.com/kt81/spanner-emu-tester.

Below is an example of the performance degradation observed:

...
$ make run
13.6603ms
17.372299ms
31.821499ms
46.6992ms
55.764599ms
70.637199ms
83.588399ms
92.9404ms
107.0446ms
116.8905ms
128.591101ms
146.4209ms
153.661801ms
166.057102ms
...

This pattern suggests a gradual decrease in performance, particularly with an increase in the number of new connections over time. Any insights or suggestions on how to mitigate this issue would be greatly appreciated.

@egonelbre
Copy link

Here's a plot of 10k individual connections and query time of SELECT count(1) FROM projects in spanner emulator 1.5.18 on Windows.

image

If I do 128 clients concurrently (on a 32core machine), I get this density plot for the query times:

image

@egonelbre
Copy link

egonelbre commented Jun 13, 2024

It looks like the sessions are not being closed properly. I'm not sure whether it's an issue in the emulator or cloud.google.com/go/spanner library.

After making 1000 SELECT 1 calls, the emulator ends up having ~80000 open sessions:

0%  last:74.909ms
10%  last:247.844ms
30%  last:171.7028ms
40%  last:139.8648ms
50%  last:121.9485ms
60%  last:202.7864ms
70%  last:178.8575ms
80%  last:80.0961ms
90%  last:212.2422ms
20%  last:3.5342151s
SESSIONS 79425

Doing a single query ends up with 100 sessions, so I'm guessing there's a batch of 100 sessions created, which are not cleaned up.

Below is the testing code.

Testing Code
package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"
	"time"

	"cloud.google.com/go/spanner"
	database "cloud.google.com/go/spanner/admin/database/apiv1"
	"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
	"cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
	spannerdirect "cloud.google.com/go/spanner/apiv1"
	"cloud.google.com/go/spanner/apiv1/spannerpb"
	"golang.org/x/sync/errgroup"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
	"google.golang.org/api/option/internaloption"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

func main() {
	err := run()
	if err != nil {
		fmt.Fprint(os.Stderr, err)
		os.Exit(1)
	}
}

func run() error {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	const (
		ProjectID, InstanceID, DatabaseID = "proj", "inst", "db"

		Database = "projects/" + ProjectID + "/instances/" + InstanceID + "/databases/" + DatabaseID
	)

	{ // create a new instance
		admin, err := instance.NewInstanceAdminClient(ctx)
		if err != nil {
			return fmt.Errorf("failed to create instance admin: %w", err)
		}
		op, err := admin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
			Parent:     "projects/" + ProjectID,
			InstanceId: InstanceID,
		})
		if err != nil {
			if spanner.ErrCode(err) == codes.AlreadyExists {
				goto instanceReady
			}
			return fmt.Errorf("create instance failed: %w", err)
		}
		if _, err := op.Wait(ctx); err != nil {
			return fmt.Errorf("failed to wait instance creation: %w", err)
		}

	instanceReady:
		if err := admin.Close(); err != nil {
			return fmt.Errorf("failed to close instance admin: %w", err)
		}
	}

	{ // create a database
		admin, err := database.NewDatabaseAdminClient(ctx)
		if err != nil {
			return fmt.Errorf("failed to create database admin: %w", err)
		}

		ddl, err := admin.CreateDatabase(ctx, &databasepb.CreateDatabaseRequest{
			Parent:          "projects/" + ProjectID + "/instances/" + InstanceID,
			DatabaseDialect: databasepb.DatabaseDialect_GOOGLE_STANDARD_SQL,
			CreateStatement: "CREATE DATABASE " + DatabaseID,
			ExtraStatements: []string{},
		})
		if err != nil {
			if spanner.ErrCode(err) == codes.AlreadyExists {
				goto databaseReady
			}
			return fmt.Errorf("failed to create database: %w", err)
		}

		if _, err := ddl.Wait(ctx); err != nil {
			return fmt.Errorf("failed to wait database creation: %w", err)
		}

	databaseReady:
		if err := admin.Close(); err != nil {
			return fmt.Errorf("failed to close database admin: %w", err)
		}
	}

	const N = 1000

	var group errgroup.Group
	group.SetLimit(128)

	for k := 0; k < N; k++ {
		if ctx.Err() != nil {
			break
		}

		k := k
		group.Go(func() error {
			client, err := spanner.NewClient(ctx, Database)
			if err != nil {
				return fmt.Errorf("failed to create client: %w", err)
			}

			start := time.Now()
			err = client.Single().Query(ctx, spanner.Statement{SQL: `SELECT 1`}).Do(
				func(row *spanner.Row) error {
					var v int64
					return row.Columns(&v)
				})
			if err != nil {
				return fmt.Errorf("failed to query: %w", err)
			}
			finish := time.Now()

			client.Close()

			if k%100 == 0 { // just a few random samples
				fmt.Printf("%v%%  last:%v\n", k*100/N, finish.Sub(start))
			}

			return nil
		})
	}

	if err := group.Wait(); err != nil {
		return err
	}

	count, err := fetchSessionCount(ctx, Database)
	if err != nil {
		return fmt.Errorf("failed to fetch count: %w", err)
	}

	fmt.Printf("SESSIONS %v\n", count)

	return nil
}

func fetchSessionCount(ctx context.Context, database string) (int64, error) {
	client, err := spannerdirect.NewClient(ctx,
		option.WithEndpoint(os.Getenv("SPANNER_EMULATOR_HOST")),
		option.WithGRPCDialOption(grpc.WithInsecure()),
		option.WithoutAuthentication(),
		option.WithGRPCDialOption(grpc.WithBlock()),
		internaloption.SkipDialSettingsValidation(),
	)
	if err != nil {
		return 0, fmt.Errorf("failed to create monitoring client: %w", err)
	}
	defer func() {
		err = errors.Join(err, client.Close())
	}()

	count, err := countSessions(ctx, client, database)
	if err != nil {
		return 0, fmt.Errorf("count sessions failed: %w", err)
	}

	return count, nil
}

func countSessions(ctx context.Context, client *spannerdirect.Client, database string) (int64, error) {
	var count int64
	req := &spannerpb.ListSessionsRequest{
		Database:  database,
		PageSize:  1e6,
		PageToken: "", // TODO: handle pagination
	}

	it := client.ListSessions(ctx, req)
	for {
		resp, err := it.Next()
		if err == iterator.Done {
			return count, nil
		}
		if err != nil {
			return count, fmt.Errorf("failed to iterate: %w", err)
		}

		if false {
			fmt.Println(resp.Name)
		}

		count++
	}
}

@egonelbre
Copy link

So, while trying to diagnose, which part of the system is the issue I tried using the grpc API directly for creating sessions and without cloud.google.com/go/spanner library. Manually calling BatchCreateSessions and DeleteSession seems to leave the session count properly at 0.

I'm not sure which part is should be responsible for cleaning up those sessions, either:

  1. cloud.google.com/go/spanner should be always calling DeleteSession for every single created session. This would be weird, because then crashed services would leave sessions around.
  2. Emulator should cleanup the sessions automatically when a connection drops.

https://cloud.google.com/spanner/docs/sessions#go seems to indicate it should be #1.

Testing Code
package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"os/signal"

	"cloud.google.com/go/spanner"
	database "cloud.google.com/go/spanner/admin/database/apiv1"
	"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
	instance "cloud.google.com/go/spanner/admin/instance/apiv1"
	"cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
	spannerdirect "cloud.google.com/go/spanner/apiv1"
	"cloud.google.com/go/spanner/apiv1/spannerpb"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"
	"google.golang.org/api/option/internaloption"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

func main() {
	err := run()
	if err != nil {
		fmt.Fprint(os.Stderr, err)
		os.Exit(1)
	}
}

func run() error {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	const (
		ProjectID, InstanceID, DatabaseID = "proj", "inst", "db"

		Database = "projects/" + ProjectID + "/instances/" + InstanceID + "/databases/" + DatabaseID
	)

	{ // create a new instance
		admin, err := instance.NewInstanceAdminClient(ctx)
		if err != nil {
			return fmt.Errorf("failed to create instance admin: %w", err)
		}
		op, err := admin.CreateInstance(ctx, &instancepb.CreateInstanceRequest{
			Parent:     "projects/" + ProjectID,
			InstanceId: InstanceID,
		})
		if err != nil {
			if spanner.ErrCode(err) == codes.AlreadyExists {
				goto instanceReady
			}
			return fmt.Errorf("create instance failed: %w", err)
		}
		if _, err := op.Wait(ctx); err != nil {
			return fmt.Errorf("failed to wait instance creation: %w", err)
		}

	instanceReady:
		if err := admin.Close(); err != nil {
			return fmt.Errorf("failed to close instance admin: %w", err)
		}
	}

	{ // create a database
		admin, err := database.NewDatabaseAdminClient(ctx)
		if err != nil {
			return fmt.Errorf("failed to create database admin: %w", err)
		}

		ddl, err := admin.CreateDatabase(ctx, &databasepb.CreateDatabaseRequest{
			Parent:          "projects/" + ProjectID + "/instances/" + InstanceID,
			DatabaseDialect: databasepb.DatabaseDialect_GOOGLE_STANDARD_SQL,
			CreateStatement: "CREATE DATABASE " + DatabaseID,
			ExtraStatements: []string{},
		})
		if err != nil {
			if spanner.ErrCode(err) == codes.AlreadyExists {
				goto databaseReady
			}
			return fmt.Errorf("failed to create database: %w", err)
		}

		if _, err := ddl.Wait(ctx); err != nil {
			return fmt.Errorf("failed to wait database creation: %w", err)
		}

	databaseReady:
		if err := admin.Close(); err != nil {
			return fmt.Errorf("failed to close database admin: %w", err)
		}
	}

	return withDirectClient(ctx, func(client *spannerdirect.Client) error {
		{
			count, err := countSessions(ctx, client, Database)
			if err != nil {
				return fmt.Errorf("failed countSessions: %w", err)
			}
			fmt.Println("STARTING SESSION COUNT", count)
		}

		var activeSessions []*spannerpb.Session

		{
			response, err := client.BatchCreateSessions(ctx, &spannerpb.BatchCreateSessionsRequest{
				SessionCount:    100,
				Database:        Database,
				SessionTemplate: &spannerpb.Session{},
			})
			if err != nil {
				return fmt.Errorf("failed BatchCreateSessions: %w", err)
			}

			fmt.Printf("created %d sessions\n", len(response.Session))

			activeSessions = response.Session
		}

		{
			for _, session := range activeSessions {
				err := client.DeleteSession(ctx, &spannerpb.DeleteSessionRequest{
					Name: session.Name,
				})
				if err != nil {
					return fmt.Errorf("failed to delete session: %w", err)
				}
			}
		}

		{
			count, err := countSessions(ctx, client, Database)
			if err != nil {
				return fmt.Errorf("failed countSessions: %w", err)
			}
			fmt.Println("FINAL SESSION COUNT", count)
		}

		return nil
	})
}

func withDirectClient(ctx context.Context, fn func(*spannerdirect.Client) error) (err error) {
	client, err := spannerdirect.NewClient(ctx,
		option.WithEndpoint(os.Getenv("SPANNER_EMULATOR_HOST")),
		option.WithGRPCDialOption(grpc.WithInsecure()),
		option.WithoutAuthentication(),
		option.WithGRPCDialOption(grpc.WithBlock()),
		internaloption.SkipDialSettingsValidation(),
	)
	if err != nil {
		return fmt.Errorf("failed to create direct client: %w", err)
	}
	defer func() {
		err = errors.Join(err, client.Close())
	}()

	return fn(client)
}

func countSessions(ctx context.Context, client *spannerdirect.Client, database string) (int64, error) {
	var count int64
	req := &spannerpb.ListSessionsRequest{
		Database:  database,
		PageSize:  1e6,
		PageToken: "", // TODO: handle pagination
	}

	it := client.ListSessions(ctx, req)
	for {
		resp, err := it.Next()
		if err == iterator.Done {
			return count, nil
		}
		if err != nil {
			return count, fmt.Errorf("failed to iterate: %w", err)
		}

		if false {
			fmt.Println(resp.Name)
		}

		count++
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants