diff --git a/pkg/csi_driver/node.go b/pkg/csi_driver/node.go index df3eb562d..c14dbd831 100644 --- a/pkg/csi_driver/node.go +++ b/pkg/csi_driver/node.go @@ -29,6 +29,7 @@ import ( "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/util" "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook" "golang.org/x/net/context" + "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" @@ -60,6 +61,7 @@ type nodeServer struct { mounter mount.Interface volumeLocks *util.VolumeLocks k8sClients clientset.Interface + limiter rate.Limiter } func newNodeServer(driver *GCSDriver, mounter mount.Interface) csi.NodeServer { @@ -69,6 +71,7 @@ func newNodeServer(driver *GCSDriver, mounter mount.Interface) csi.NodeServer { mounter: mounter, volumeLocks: util.NewVolumeLocks(), k8sClients: driver.config.K8sClients, + limiter: *rate.NewLimiter(rate.Every(time.Second), 5), } } @@ -85,6 +88,11 @@ func (s *nodeServer) NodeGetCapabilities(_ context.Context, _ *csi.NodeGetCapabi } func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + // Rate limit NodePublishVolume calls to avoid kube API throttling. + if s.limiter.Wait(ctx) != nil { + return nil, status.Errorf(codes.Aborted, "NodePublishVolume request is aborted due to rate limit") + } + // Validate arguments bucketName := req.GetVolumeId() vc := req.GetVolumeContext()