diff --git a/cmd/manager/config.yml b/cmd/manager/config.yml index 73d3ed17..560aacf8 100644 --- a/cmd/manager/config.yml +++ b/cmd/manager/config.yml @@ -37,6 +37,8 @@ destination: expiration: 0 manager: + grpc_service_address: ":8085" + http_address: ":8080" internal_broker_config: channel_size: 1000000 number_instant: 5 diff --git a/cmd/manager/main.go b/cmd/manager/main.go index a7d8c150..6e4da1be 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -1,17 +1,24 @@ package main import ( + "context" "fmt" "log/slog" + "os" + "os/signal" "sync" + "syscall" + "time" "github.com/ormushq/ormus/config" "github.com/ormushq/ormus/logger" + "github.com/ormushq/ormus/manager/delivery/grpcserver" "github.com/ormushq/ormus/manager/delivery/httpserver" "github.com/ormushq/ormus/manager/delivery/httpserver/userhandler" "github.com/ormushq/ormus/manager/managerparam" "github.com/ormushq/ormus/manager/mockRepo/projectstub" "github.com/ormushq/ormus/manager/repository/scyllarepo" + "github.com/ormushq/ormus/manager/repository/sourcerepo" "github.com/ormushq/ormus/manager/service/authservice" "github.com/ormushq/ormus/manager/service/projectservice" "github.com/ormushq/ormus/manager/service/userservice" @@ -53,9 +60,41 @@ func main() { userHand := userhandler.New(userSvc, validateUserSvc, ProjectSvc) workers.New(ProjectSvc, internalBroker).Run(done, &wg) - server := httpserver.New(cfg, httpserver.SetupServicesResponse{ + httpServer := httpserver.New(cfg, httpserver.SetupServicesResponse{ UserHandler: userHand, }) - server.Server() + sourceRepo := sourcerepo.New(scylla) + + grpcServer := grpcserver.New(sourceRepo, cfg) + + if err := httpServer.Start(); err != nil { + logger.L().Error("Failed to start manager HTTP server", err) + os.Exit(1) + } + + if err := grpcServer.Start(); err != nil { + logger.L().Error("Failed to start gRPC server", "error", err) + os.Exit(1) + } + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt, syscall.SIGTERM) + <-quit + logger.L().Info("Shutting down manager server...") + + const timeoutDuration = 10 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + + if err := httpServer.Shutdown(ctx); err != nil { + logger.L().Error("manager HTTP server forced to shutdown", "error", err) + } + + if err := grpcServer.Stop(ctx); err != nil { + logger.L().Error("manager gRPC server forced to shutdown", "error", err) + } + + logger.L().Info("Server exiting") } diff --git a/contract/protobuf/manager/goproto/writekey/writekey.pb.go b/contract/protobuf/manager/goproto/writekey/writekey.pb.go new file mode 100644 index 00000000..c818b63f --- /dev/null +++ b/contract/protobuf/manager/goproto/writekey/writekey.pb.go @@ -0,0 +1,396 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v3.12.4 +// source: writekey.proto + +package writekey + +import ( + timestamp "github.com/golang/protobuf/ptypes/timestamp" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WriteKeyStatus int32 + +const ( + WriteKeyStatus_INACTIVE WriteKeyStatus = 0 + WriteKeyStatus_ACTIVE WriteKeyStatus = 1 +) + +// Enum value maps for WriteKeyStatus. +var ( + WriteKeyStatus_name = map[int32]string{ + 0: "INACTIVE", + 1: "ACTIVE", + } + WriteKeyStatus_value = map[string]int32{ + "INACTIVE": 0, + "ACTIVE": 1, + } +) + +func (x WriteKeyStatus) Enum() *WriteKeyStatus { + p := new(WriteKeyStatus) + *p = x + return p +} + +func (x WriteKeyStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (WriteKeyStatus) Descriptor() protoreflect.EnumDescriptor { + return file_writekey_proto_enumTypes[0].Descriptor() +} + +func (WriteKeyStatus) Type() protoreflect.EnumType { + return &file_writekey_proto_enumTypes[0] +} + +func (x WriteKeyStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use WriteKeyStatus.Descriptor instead. +func (WriteKeyStatus) EnumDescriptor() ([]byte, []int) { + return file_writekey_proto_rawDescGZIP(), []int{0} +} + +type WriteKeyMetaData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WriteKey string `protobuf:"bytes,1,opt,name=write_key,json=writeKey,proto3" json:"write_key,omitempty"` + OwnerId string `protobuf:"bytes,2,opt,name=owner_id,json=ownerId,proto3" json:"owner_id,omitempty"` + SourceId string `protobuf:"bytes,3,opt,name=source_id,json=sourceId,proto3" json:"source_id,omitempty"` + CreatedAt *timestamp.Timestamp `protobuf:"bytes,4,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + LastUsedAt *timestamp.Timestamp `protobuf:"bytes,5,opt,name=last_used_at,json=lastUsedAt,proto3" json:"last_used_at,omitempty"` + Status WriteKeyStatus `protobuf:"varint,6,opt,name=status,proto3,enum=writekey.WriteKeyStatus" json:"status,omitempty"` +} + +func (x *WriteKeyMetaData) Reset() { + *x = WriteKeyMetaData{} + if protoimpl.UnsafeEnabled { + mi := &file_writekey_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteKeyMetaData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteKeyMetaData) ProtoMessage() {} + +func (x *WriteKeyMetaData) ProtoReflect() protoreflect.Message { + mi := &file_writekey_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteKeyMetaData.ProtoReflect.Descriptor instead. +func (*WriteKeyMetaData) Descriptor() ([]byte, []int) { + return file_writekey_proto_rawDescGZIP(), []int{0} +} + +func (x *WriteKeyMetaData) GetWriteKey() string { + if x != nil { + return x.WriteKey + } + return "" +} + +func (x *WriteKeyMetaData) GetOwnerId() string { + if x != nil { + return x.OwnerId + } + return "" +} + +func (x *WriteKeyMetaData) GetSourceId() string { + if x != nil { + return x.SourceId + } + return "" +} + +func (x *WriteKeyMetaData) GetCreatedAt() *timestamp.Timestamp { + if x != nil { + return x.CreatedAt + } + return nil +} + +func (x *WriteKeyMetaData) GetLastUsedAt() *timestamp.Timestamp { + if x != nil { + return x.LastUsedAt + } + return nil +} + +func (x *WriteKeyMetaData) GetStatus() WriteKeyStatus { + if x != nil { + return x.Status + } + return WriteKeyStatus_INACTIVE +} + +type GetWriteKeyRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + WriteKey string `protobuf:"bytes,1,opt,name=write_key,json=writeKey,proto3" json:"write_key,omitempty"` +} + +func (x *GetWriteKeyRequest) Reset() { + *x = GetWriteKeyRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_writekey_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetWriteKeyRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetWriteKeyRequest) ProtoMessage() {} + +func (x *GetWriteKeyRequest) ProtoReflect() protoreflect.Message { + mi := &file_writekey_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetWriteKeyRequest.ProtoReflect.Descriptor instead. +func (*GetWriteKeyRequest) Descriptor() ([]byte, []int) { + return file_writekey_proto_rawDescGZIP(), []int{1} +} + +func (x *GetWriteKeyRequest) GetWriteKey() string { + if x != nil { + return x.WriteKey + } + return "" +} + +type GetWriteKeyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Metadata *WriteKeyMetaData `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` +} + +func (x *GetWriteKeyResponse) Reset() { + *x = GetWriteKeyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_writekey_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetWriteKeyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetWriteKeyResponse) ProtoMessage() {} + +func (x *GetWriteKeyResponse) ProtoReflect() protoreflect.Message { + mi := &file_writekey_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetWriteKeyResponse.ProtoReflect.Descriptor instead. +func (*GetWriteKeyResponse) Descriptor() ([]byte, []int) { + return file_writekey_proto_rawDescGZIP(), []int{2} +} + +func (x *GetWriteKeyResponse) GetMetadata() *WriteKeyMetaData { + if x != nil { + return x.Metadata + } + return nil +} + +var File_writekey_proto protoreflect.FileDescriptor + +var file_writekey_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x77, 0x72, 0x69, 0x74, 0x65, 0x6b, 0x65, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x77, 0x72, 0x69, 0x74, 0x65, 0x6b, 0x65, 0x79, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x92, 0x02, 0x0a, 0x10, + 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, + 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x19, 0x0a, + 0x08, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x39, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, + 0x5f, 0x61, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, + 0x12, 0x3c, 0x0a, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x75, 0x73, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x0a, 0x6c, 0x61, 0x73, 0x74, 0x55, 0x73, 0x65, 0x64, 0x41, 0x74, 0x12, 0x30, + 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x18, + 0x2e, 0x77, 0x72, 0x69, 0x74, 0x65, 0x6b, 0x65, 0x79, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, + 0x65, 0x79, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, + 0x22, 0x31, 0x0a, 0x12, 0x47, 0x65, 0x74, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, + 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x72, 0x69, 0x74, 0x65, + 0x4b, 0x65, 0x79, 0x22, 0x4d, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, + 0x65, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, + 0x72, 0x69, 0x74, 0x65, 0x6b, 0x65, 0x79, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, + 0x4d, 0x65, 0x74, 0x61, 0x44, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x2a, 0x2a, 0x0a, 0x0e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x0c, 0x0a, 0x08, 0x49, 0x4e, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, + 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x10, 0x01, 0x32, 0x5d, + 0x0a, 0x0f, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, + 0x72, 0x12, 0x4a, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x57, 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, + 0x12, 0x1c, 0x2e, 0x77, 0x72, 0x69, 0x74, 0x65, 0x6b, 0x65, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x57, + 0x72, 0x69, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, + 0x2e, 0x77, 0x72, 0x69, 0x74, 0x65, 0x6b, 0x65, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x57, 0x72, 0x69, + 0x74, 0x65, 0x4b, 0x65, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x45, 0x5a, + 0x43, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x72, 0x6d, 0x75, + 0x73, 0x68, 0x71, 0x2f, 0x6f, 0x72, 0x6d, 0x75, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, + 0x63, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x6d, 0x61, 0x6e, 0x61, + 0x67, 0x65, 0x72, 0x2f, 0x67, 0x6f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x77, 0x72, 0x69, 0x74, + 0x65, 0x6b, 0x65, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_writekey_proto_rawDescOnce sync.Once + file_writekey_proto_rawDescData = file_writekey_proto_rawDesc +) + +func file_writekey_proto_rawDescGZIP() []byte { + file_writekey_proto_rawDescOnce.Do(func() { + file_writekey_proto_rawDescData = protoimpl.X.CompressGZIP(file_writekey_proto_rawDescData) + }) + return file_writekey_proto_rawDescData +} + +var file_writekey_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_writekey_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_writekey_proto_goTypes = []any{ + (WriteKeyStatus)(0), // 0: writekey.WriteKeyStatus + (*WriteKeyMetaData)(nil), // 1: writekey.WriteKeyMetaData + (*GetWriteKeyRequest)(nil), // 2: writekey.GetWriteKeyRequest + (*GetWriteKeyResponse)(nil), // 3: writekey.GetWriteKeyResponse + (*timestamp.Timestamp)(nil), // 4: google.protobuf.Timestamp +} +var file_writekey_proto_depIdxs = []int32{ + 4, // 0: writekey.WriteKeyMetaData.created_at:type_name -> google.protobuf.Timestamp + 4, // 1: writekey.WriteKeyMetaData.last_used_at:type_name -> google.protobuf.Timestamp + 0, // 2: writekey.WriteKeyMetaData.status:type_name -> writekey.WriteKeyStatus + 1, // 3: writekey.GetWriteKeyResponse.metadata:type_name -> writekey.WriteKeyMetaData + 2, // 4: writekey.WriteKeyManager.GetWriteKey:input_type -> writekey.GetWriteKeyRequest + 3, // 5: writekey.WriteKeyManager.GetWriteKey:output_type -> writekey.GetWriteKeyResponse + 5, // [5:6] is the sub-list for method output_type + 4, // [4:5] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_writekey_proto_init() } +func file_writekey_proto_init() { + if File_writekey_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_writekey_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*WriteKeyMetaData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_writekey_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*GetWriteKeyRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_writekey_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*GetWriteKeyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_writekey_proto_rawDesc, + NumEnums: 1, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_writekey_proto_goTypes, + DependencyIndexes: file_writekey_proto_depIdxs, + EnumInfos: file_writekey_proto_enumTypes, + MessageInfos: file_writekey_proto_msgTypes, + }.Build() + File_writekey_proto = out.File + file_writekey_proto_rawDesc = nil + file_writekey_proto_goTypes = nil + file_writekey_proto_depIdxs = nil +} diff --git a/contract/protobuf/manager/goproto/writekey/writekey_grpc.pb.go b/contract/protobuf/manager/goproto/writekey/writekey_grpc.pb.go new file mode 100644 index 00000000..93b75854 --- /dev/null +++ b/contract/protobuf/manager/goproto/writekey/writekey_grpc.pb.go @@ -0,0 +1,110 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.4.0 +// - protoc v3.12.4 +// source: writekey.proto + +package writekey + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.62.0 or later. +const _ = grpc.SupportPackageIsVersion8 + +const ( + WriteKeyManager_GetWriteKey_FullMethodName = "/writekey.WriteKeyManager/GetWriteKey" +) + +// WriteKeyManagerClient is the client API for WriteKeyManager service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type WriteKeyManagerClient interface { + GetWriteKey(ctx context.Context, in *GetWriteKeyRequest, opts ...grpc.CallOption) (*GetWriteKeyResponse, error) +} + +type writeKeyManagerClient struct { + cc grpc.ClientConnInterface +} + +func NewWriteKeyManagerClient(cc grpc.ClientConnInterface) WriteKeyManagerClient { + return &writeKeyManagerClient{cc} +} + +func (c *writeKeyManagerClient) GetWriteKey(ctx context.Context, in *GetWriteKeyRequest, opts ...grpc.CallOption) (*GetWriteKeyResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetWriteKeyResponse) + err := c.cc.Invoke(ctx, WriteKeyManager_GetWriteKey_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// WriteKeyManagerServer is the server API for WriteKeyManager service. +// All implementations must embed UnimplementedWriteKeyManagerServer +// for forward compatibility +type WriteKeyManagerServer interface { + GetWriteKey(context.Context, *GetWriteKeyRequest) (*GetWriteKeyResponse, error) + mustEmbedUnimplementedWriteKeyManagerServer() +} + +// UnimplementedWriteKeyManagerServer must be embedded to have forward compatible implementations. +type UnimplementedWriteKeyManagerServer struct { +} + +func (UnimplementedWriteKeyManagerServer) GetWriteKey(context.Context, *GetWriteKeyRequest) (*GetWriteKeyResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetWriteKey not implemented") +} +func (UnimplementedWriteKeyManagerServer) mustEmbedUnimplementedWriteKeyManagerServer() {} + +// UnsafeWriteKeyManagerServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to WriteKeyManagerServer will +// result in compilation errors. +type UnsafeWriteKeyManagerServer interface { + mustEmbedUnimplementedWriteKeyManagerServer() +} + +func RegisterWriteKeyManagerServer(s grpc.ServiceRegistrar, srv WriteKeyManagerServer) { + s.RegisterService(&WriteKeyManager_ServiceDesc, srv) +} + +func _WriteKeyManager_GetWriteKey_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetWriteKeyRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WriteKeyManagerServer).GetWriteKey(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: WriteKeyManager_GetWriteKey_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WriteKeyManagerServer).GetWriteKey(ctx, req.(*GetWriteKeyRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// WriteKeyManager_ServiceDesc is the grpc.ServiceDesc for WriteKeyManager service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var WriteKeyManager_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "writekey.WriteKeyManager", + HandlerType: (*WriteKeyManagerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetWriteKey", + Handler: _WriteKeyManager_GetWriteKey_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "writekey.proto", +} diff --git a/contract/protobuf/manager/manager.proto b/contract/protobuf/manager/manager.proto deleted file mode 100644 index e69de29b..00000000 diff --git a/contract/protobuf/manager/writekey.proto b/contract/protobuf/manager/writekey.proto new file mode 100644 index 00000000..beefa6fa --- /dev/null +++ b/contract/protobuf/manager/writekey.proto @@ -0,0 +1,33 @@ +syntax = "proto3"; + +package writekey; + +option go_package = "github.com/ormushq/ormus/contract/protobuf/manager/goproto/writekey"; + +import "google/protobuf/timestamp.proto"; + +message WriteKeyMetaData { + string write_key = 1; + string owner_id = 2; + string source_id = 3; + google.protobuf.Timestamp created_at = 4; + google.protobuf.Timestamp last_used_at = 5; + WriteKeyStatus status = 6; +} + +enum WriteKeyStatus { + WRITE_KEY_STATUS_INACTIVE = 0; + WRITE_KEY_STATUS_ACTIVE = 1; +} + +message GetWriteKeyRequest { + string write_key = 1; +} + +message GetWriteKeyResponse { + WriteKeyMetaData metadata = 1; +} + +service WriteKeyManager { + rpc GetWriteKey(GetWriteKeyRequest) returns (GetWriteKeyResponse); +} \ No newline at end of file diff --git a/manager/config.go b/manager/config.go index 3e92ae1c..ea97ebd5 100644 --- a/manager/config.go +++ b/manager/config.go @@ -9,7 +9,10 @@ type Config struct { JWTConfig authservice.JwtConfig `koanf:"jwt_config"` InternalBrokerConfig InternalBrokerConfig `koanf:"internal_broker_config"` ScyllaDBConfig scylladb.Config `koanf:"scylla_DB_Config"` + HTTPAddress string `koanf:"http_address"` + GRPCServiceAddress string `koanf:"grpc_service_address"` } + type InternalBrokerConfig struct { ChannelSize int `koanf:"channel_size"` NumberInstant int `koanf:"number_instant"` diff --git a/manager/delivery/grpcserver/get_writekey.go b/manager/delivery/grpcserver/get_writekey.go new file mode 100644 index 00000000..aed03983 --- /dev/null +++ b/manager/delivery/grpcserver/get_writekey.go @@ -0,0 +1,53 @@ +package grpcserver + +import ( + "context" + "errors" + + "github.com/ormushq/ormus/contract/protobuf/manager/goproto/writekey" + "github.com/ormushq/ormus/manager/repository/sourcerepo" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func (s *Server) GetWriteKey(ctx context.Context, + req *writekey.GetWriteKeyRequest, +) (*writekey.GetWriteKeyResponse, error) { + if req.WriteKey == "" { + return nil, status.Error(codes.InvalidArgument, "write key is required") + } + + select { + case <-ctx.Done(): + return nil, status.Error(codes.Canceled, "request was canceled") + default: + } + + // Retrieve the write key metadata from your database + // This is where we call the database access layer + metadata, err := s.sourceRepo.GetWriteKey(ctx, req.WriteKey) + if err != nil { + if errors.Is(err, sourcerepo.ErrWriteKeyNotFound) { + return nil, status.Error(codes.NotFound, "write key not found") + } + return nil, status.Error(codes.Internal, "failed to retrieve write key") + } + + select { + case <-ctx.Done(): + return nil, status.Error(codes.Canceled, "request was canceled") + default: + } + + protoMetadata := &writekey.WriteKeyMetaData{ + WriteKey: metadata.WriteKey, + OwnerId: metadata.OwnerID, + SourceId: metadata.SourceID, + CreatedAt: timestamppb.New(metadata.CreatedAt), + LastUsedAt: timestamppb.New(metadata.LastUsedAt), + Status: writekey.WriteKeyStatus(writekey.WriteKeyStatus_value[string(metadata.Status)]), + } + + return &writekey.GetWriteKeyResponse{Metadata: protoMetadata}, nil +} diff --git a/manager/delivery/grpcserver/server.go b/manager/delivery/grpcserver/server.go new file mode 100644 index 00000000..8550a7f4 --- /dev/null +++ b/manager/delivery/grpcserver/server.go @@ -0,0 +1,75 @@ +package grpcserver + +import ( + "context" + "fmt" + "log" + "net" + "sync" + + "github.com/ormushq/ormus/contract/protobuf/manager/goproto/writekey" + "github.com/ormushq/ormus/manager" + "github.com/ormushq/ormus/manager/repository/sourcerepo" + "google.golang.org/grpc" +) + +type Server struct { + writekey.UnimplementedWriteKeyManagerServer + sourceRepo sourcerepo.SourceRepository + config manager.Config + grpcServer *grpc.Server + mu sync.Mutex +} + +func New(sourceRepo sourcerepo.SourceRepository, config manager.Config) Server { + return Server{ + sourceRepo: sourceRepo, + config: config, + } +} + +func (s *Server) Start() error { + s.mu.Lock() + defer s.mu.Unlock() + + address := fmt.Sprintf(":%s", s.config.GRPCServiceAddress) + listener, err := net.Listen("tcp", address) + if err != nil { + return fmt.Errorf("failed to listen: %v", err) + } + + s.grpcServer = grpc.NewServer() + writekey.RegisterWriteKeyManagerServer(s.grpcServer, s) + + log.Printf("manager gRPC server listening at %s\n", address) + if err := s.grpcServer.Serve(listener); err != nil { + return fmt.Errorf("failed to serve: %v", err) + } + + return nil +} + +func (s *Server) Stop(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.grpcServer == nil { + return nil + } + + stopped := make(chan struct{}) + go func() { + s.grpcServer.GracefulStop() + close(stopped) + }() + + select { + case <-ctx.Done(): + s.grpcServer.Stop() + case <-stopped: + } + + s.grpcServer = nil + + return nil +} diff --git a/manager/delivery/httpserver/server.go b/manager/delivery/httpserver/server.go index 7de42165..3fb3abe8 100644 --- a/manager/delivery/httpserver/server.go +++ b/manager/delivery/httpserver/server.go @@ -1,6 +1,7 @@ package httpserver import ( + "context" "github.com/labstack/echo/v4" "github.com/ormushq/ormus/manager" "github.com/ormushq/ormus/manager/delivery/httpserver/sourcehandler" @@ -26,7 +27,7 @@ func New(cfg manager.Config, setupSvc SetupServicesResponse) *Server { } } -func (s *Server) Server() { +func (s *Server) Start() error { e := echo.New() s.userHandler.SetUserRoute(e) @@ -34,5 +35,26 @@ func (s *Server) Server() { e.GET("/health-check", s.healthCheck) - e.Logger.Fatal(e.Start(":8080")) + go func() { + if err := e.Start(s.config.HTTPAddress); err != nil { + e.Logger.Info("shutting down the server") + } + }() + + return nil +} + +func (s *Server) Shutdown(ctx context.Context) error { + return s.Router.Shutdown(ctx) } + +//func (s *Server) Server() { +// e := echo.New() +// +// s.userHandler.SetUserRoute(e) +// s.sourceHandler.SetSourceRoute(e) +// +// e.GET("/health-check", s.healthCheck) +// +// e.Logger.Fatal(e.Start(":8080")) +//} diff --git a/manager/entity/source.go b/manager/entity/source.go index 46c8cdde..604d21ba 100644 --- a/manager/entity/source.go +++ b/manager/entity/source.go @@ -4,8 +4,6 @@ import ( "time" ) -type WriteKey string // because we might change the format in future - type SourceCategory string type Status string @@ -18,7 +16,7 @@ const ( // TODO: need change feilds. type Source struct { ID string - WriteKey WriteKey + WriteKey WriteKeyMetaData Name string Description string ProjectID string diff --git a/manager/entity/writekey.go b/manager/entity/writekey.go new file mode 100644 index 00000000..75d8173a --- /dev/null +++ b/manager/entity/writekey.go @@ -0,0 +1,19 @@ +package entity + +import "time" + +type WriteKeyStatus string + +const ( + WriteKeyStatusInactive WriteKeyStatus = "WRITE_KEY_STATUS_INACTIVE" + WriteKeyStatusActive WriteKeyStatus = "WRITE_KEY_STATUS_ACTIVE" +) + +type WriteKeyMetaData struct { + WriteKey string `json:"write_key"` + OwnerID string `json:"owner_id"` + SourceID string `json:"source_id"` + CreatedAt time.Time `json:"created_at"` + LastUsedAt time.Time `json:"last_used_at"` + Status WriteKeyStatus `json:"status"` +} diff --git a/manager/managerparam/add_source.go b/manager/managerparam/add_source.go index ed13db14..48176b2d 100644 --- a/manager/managerparam/add_source.go +++ b/manager/managerparam/add_source.go @@ -13,14 +13,14 @@ type AddSourceRequest struct { } type AddSourceResponse struct { - ID string - WriteKey string - Name string - Description string - ProjectID string - OwnerID string - Status entity.Status - CreateAt time.Time - UpdateAt time.Time - DeleteAt *time.Time + ID string + WriteKeyMetaData entity.WriteKeyMetaData + Name string + Description string + ProjectID string + OwnerID string + Status entity.Status + CreateAt time.Time + UpdateAt time.Time + DeleteAt *time.Time } diff --git a/manager/managerparam/update_source.go b/manager/managerparam/update_source.go index fccc40be..c23d4a43 100644 --- a/manager/managerparam/update_source.go +++ b/manager/managerparam/update_source.go @@ -14,14 +14,14 @@ type UpdateSourceRequest struct { } type UpdateSourceResponse struct { - ID string - WriteKey string - Name string - Description string - ProjectID string - OwnerID string - Status entity.Status - CreateAt time.Time - UpdateAt time.Time - DeleteAt *time.Time + ID string + WriteKeyMetaData entity.WriteKeyMetaData + Name string + Description string + ProjectID string + OwnerID string + Status entity.Status + CreateAt time.Time + UpdateAt time.Time + DeleteAt *time.Time } diff --git a/manager/managerparam/writekey.go b/manager/managerparam/writekey.go new file mode 100644 index 00000000..a47060a1 --- /dev/null +++ b/manager/managerparam/writekey.go @@ -0,0 +1,19 @@ +package managerparam + +import "time" + +type WriteKeyStatus string + +const ( + WriteKeyStatusInactive WriteKeyStatus = "WRITE_KEY_STATUS_INACTIVE" + WriteKeyStatusActive WriteKeyStatus = "WRITE_KEY_STATUS_ACTIVE" +) + +type WriteKeyMetaData struct { + WriteKey string `json:"write_key"` + OwnerID string `json:"owner_id"` + SourceID string `json:"source_id"` + CreatedAt time.Time `json:"created_at"` + LastUsedAt time.Time `json:"last_used_at"` + Status WriteKeyStatus `json:"status"` +} diff --git a/manager/mockRepo/sourcemock/source_repo_mock.go b/manager/mockRepo/sourcemock/source_repo_mock.go index abd47b0a..30c239ef 100644 --- a/manager/mockRepo/sourcemock/source_repo_mock.go +++ b/manager/mockRepo/sourcemock/source_repo_mock.go @@ -2,28 +2,43 @@ package sourcemock import ( "fmt" + "time" "github.com/ormushq/ormus/manager/entity" "github.com/ormushq/ormus/manager/managerparam" "github.com/ormushq/ormus/pkg/errmsg" "github.com/ormushq/ormus/pkg/richerror" + writekey "github.com/ormushq/ormus/pkg/write_key" ) const RepoErr = "repository error" type DefaultSourceTest struct { ID string - WriteKey entity.WriteKey + WriteKey entity.WriteKeyMetaData Name string Description string ProjectID string OwnerID string } +func GetDefaultWriteKeyMetaData() entity.WriteKeyMetaData { + w, _ := writekey.GenerateNewWriteKey() + + return entity.WriteKeyMetaData{ + WriteKey: w, + OwnerID: "owner_id", + SourceID: "source_id", + CreatedAt: time.Now(), + LastUsedAt: time.Now(), + Status: entity.WriteKeyStatusActive, + } +} + func DefaultSource() DefaultSourceTest { return DefaultSourceTest{ ID: "source_id", - WriteKey: entity.WriteKey("writekey"), + WriteKey: GetDefaultWriteKeyMetaData(), Name: "name name", Description: "description", ProjectID: "project_id", @@ -63,16 +78,16 @@ func (m *MockRepo) InsertSource(source *entity.Source) (*managerparam.AddSourceR m.sources = append(m.sources, source) return &managerparam.AddSourceResponse{ - ID: source.ID, - WriteKey: string(source.WriteKey), - Name: source.Name, - Description: source.Description, - ProjectID: source.ProjectID, - OwnerID: source.OwnerID, - Status: source.Status, - CreateAt: source.CreateAt, - UpdateAt: source.UpdateAt, - DeleteAt: source.DeleteAt, + ID: source.ID, + WriteKeyMetaData: source.WriteKey, // TODO: Only write key value or the whole meta data object? + Name: source.Name, + Description: source.Description, + ProjectID: source.ProjectID, + OwnerID: source.OwnerID, + Status: source.Status, + CreateAt: source.CreateAt, + UpdateAt: source.UpdateAt, + DeleteAt: source.DeleteAt, }, nil } @@ -86,16 +101,16 @@ func (m *MockRepo) UpdateSource(id string, source *entity.Source) (*managerparam m.sources[i] = source return &managerparam.UpdateSourceResponse{ - ID: source.ID, - WriteKey: string(source.WriteKey), - Name: source.Name, - Description: source.Description, - ProjectID: source.ProjectID, - OwnerID: source.OwnerID, - Status: source.Status, - CreateAt: source.CreateAt, - UpdateAt: source.UpdateAt, - DeleteAt: source.DeleteAt, + ID: source.ID, + WriteKeyMetaData: source.WriteKey, + Name: source.Name, + Description: source.Description, + ProjectID: source.ProjectID, + OwnerID: source.OwnerID, + Status: source.Status, + CreateAt: source.CreateAt, + UpdateAt: source.UpdateAt, + DeleteAt: source.DeleteAt, }, nil } } @@ -146,3 +161,56 @@ func (m *MockRepo) IsSourceAlreadyCreatedByName(name string) (bool, error) { return false, nil } + +func (m *MockRepo) UpdateWriteKeyMetaData(metadata *entity.WriteKeyMetaData) error { + if m.hasErr { + return richerror.New("MockRepo.UpdateWriteKey").WithWrappedError(fmt.Errorf(RepoErr)) + } + + for i, s := range m.sources { + if s.WriteKey.WriteKey == metadata.WriteKey { + s.WriteKey = *metadata + m.sources[i] = s + return nil + } + } + + return richerror.New("MockRepo.UpdateWriteKey").WithMessage(errmsg.ErrFailedToUpdateWriteKeyMetaData) +} + +func (m *MockRepo) GetWriteKeyMetaData(writeKey string) (*managerparam.WriteKeyMetaData, error) { + if m.hasErr { + return nil, richerror.New("MockRepo.GetWriteKey").WithWrappedError(fmt.Errorf(RepoErr)) + } + + for _, s := range m.sources { + if s.WriteKey.WriteKey == writeKey { + return &managerparam.WriteKeyMetaData{ + WriteKey: s.WriteKey.WriteKey, + OwnerID: s.WriteKey.OwnerID, + SourceID: s.WriteKey.SourceID, + CreatedAt: s.WriteKey.CreatedAt, + LastUsedAt: s.WriteKey.LastUsedAt, + Status: managerparam.WriteKeyStatus(s.WriteKey.Status), + }, nil + } + } + + return nil, richerror.New("MockRepo.GetWriteKeyMetaData").WithMessage(errmsg.ErrFailedToGetWriteKeyMetaData) +} + +func (m *MockRepo) UpdateLastUsedAt(writeKey string, lastUsedAt time.Time) error { + if m.hasErr { + return richerror.New("MockRepo.UpdateLastUsedAt").WithWrappedError(fmt.Errorf(RepoErr)) + } + + for i, s := range m.sources { + if s.WriteKey.WriteKey == writeKey { + s.WriteKey.LastUsedAt = lastUsedAt + m.sources[i] = s + return nil + } + } + + return richerror.New("MockRepo.UpdateLastUsedAt").WithMessage(errmsg.ErrWriteKeyNotFound) +} diff --git a/manager/repository/sourcerepo/source.go b/manager/repository/sourcerepo/source.go new file mode 100644 index 00000000..4fd8f62a --- /dev/null +++ b/manager/repository/sourcerepo/source.go @@ -0,0 +1,52 @@ +package sourcerepo + +import ( + "context" + "errors" + "strings" + + "github.com/ormushq/ormus/manager/entity" + "github.com/ormushq/ormus/manager/repository/scyllarepo" + "github.com/scylladb/gocqlx/v2/qb" +) + +var ErrWriteKeyNotFound = errors.New("write key not found") + +type SourceRepository interface { + GetWriteKey(ctx context.Context, key string) (entity.WriteKeyMetaData, error) +} + +type sourceRepo struct { + scyllaAdapter *scyllarepo.StorageAdapter +} + +func New(scyllaAdapter *scyllarepo.StorageAdapter) SourceRepository { + return &sourceRepo{scyllaAdapter: scyllaAdapter} +} + +func (s sourceRepo) GetWriteKey(ctx context.Context, + writeKey string, +) (entity.WriteKeyMetaData, error) { + var metadata entity.WriteKeyMetaData + + stmt, names := qb.Select("write_keys"). + Columns("write_key", "owner_id", "source_id", "created_at", "last_used_at", "status"). + Where(qb.Eq("write_key")). + ToCql() + + m := map[string]interface{}{ + "write_key": writeKey, + } + + // Execute the query + q := s.scyllaAdapter.ScyllaConn.Query(stmt, names) + qx := q.BindMap(m) + if err := qx.Get(&metadata); err != nil { + if strings.Contains(err.Error(), "not found") { + return entity.WriteKeyMetaData{}, ErrWriteKeyNotFound + } + return entity.WriteKeyMetaData{}, err + } + + return metadata, nil +} diff --git a/manager/service/sourceservice/create.go b/manager/service/sourceservice/create.go index 2f303c16..11f601d3 100644 --- a/manager/service/sourceservice/create.go +++ b/manager/service/sourceservice/create.go @@ -3,28 +3,52 @@ package sourceservice import ( "github.com/ormushq/ormus/manager/entity" "github.com/ormushq/ormus/manager/managerparam" + "github.com/ormushq/ormus/pkg/richerror" writekey "github.com/ormushq/ormus/pkg/write_key" + "time" ) func (s Service) CreateSource(req *managerparam.AddSourceRequest, ownerID string) (*managerparam.AddSourceResponse, error) { + const op = "sourceservice.CreateSource" + w, err := writekey.GenerateNewWriteKey() if err != nil { return nil, err } - source := &entity.Source{ - ID: "", // TODO uuid ulid ? - WriteKey: entity.WriteKey(w), + writeKeyMetaData := entity.WriteKeyMetaData{ + WriteKey: w, + OwnerID: ownerID, + SourceID: "", + CreatedAt: time.Now(), + LastUsedAt: time.Now(), + Status: entity.WriteKeyStatusActive, + } + + source := entity.Source{ + ID: "", // Will be generated by the repository + WriteKey: writeKeyMetaData, Name: req.Name, Description: req.Description, OwnerID: ownerID, ProjectID: req.ProjectID, + Status: entity.StatusActive, + CreateAt: time.Now(), + UpdateAt: time.Now(), } - response, err := s.repo.InsertSource(source) + // TODO: The reason(s) behind '&source' instead of 'source' + response, err := s.repo.InsertSource(&source) if err != nil { return nil, err } + writeKeyMetaData.SourceID = response.ID + + err = s.repo.UpdateWriteKeyMetaData(&writeKeyMetaData) + if err != nil { + return nil, richerror.New(op) + } + return response, nil } diff --git a/manager/service/sourceservice/service.go b/manager/service/sourceservice/service.go index 5836ee34..291dd2e1 100644 --- a/manager/service/sourceservice/service.go +++ b/manager/service/sourceservice/service.go @@ -3,6 +3,7 @@ package sourceservice import ( "github.com/ormushq/ormus/manager/entity" "github.com/ormushq/ormus/manager/managerparam" + "time" ) type SourceRepo interface { @@ -11,6 +12,9 @@ type SourceRepo interface { DeleteSource(id, userID string) error GetUserSourceByID(ownerID, id string) (*entity.Source, error) IsSourceAlreadyCreatedByName(name string) (bool, error) + UpdateWriteKeyMetaData(metadata *entity.WriteKeyMetaData) error + GetWriteKeyMetaData(writeKey string) (*managerparam.WriteKeyMetaData, error) + UpdateLastUsedAt(writeKey string, lastUsedAt time.Time) error } type Service struct { diff --git a/pkg/errmsg/message.go b/pkg/errmsg/message.go index d3e42f6a..11bae9fb 100644 --- a/pkg/errmsg/message.go +++ b/pkg/errmsg/message.go @@ -1,17 +1,20 @@ package errmsg const ( - ErrJwtEmptyUser = "for generating a JWT token, email is required" - ErrWrongCredentials = "username or password isn't correct" - ErrSomeThingWentWrong = "some thing went wrong" - ErrAuthUserNotFound = "user not found" - ErrEmailIsNotValid = "email is not valid" - ErrAuthUserExisting = "a user with this email is already registered" - ErrPasswordIsNotValid = "password is not valid" - ErrorMsgInvalidInput = "invalid input" - ErrBadRequest = "Bad request" - ErrUserNotFound = "user not found" - ErrChannelNotFound = "channel not found: %v" - ErrFailedToOpenChannel = "failed to open rabbitmq channel" - ErrFailedToCloseChannel = "failed to close rabbitmq channel" + ErrJwtEmptyUser = "for generating a JWT token, email is required" + ErrWrongCredentials = "username or password isn't correct" + ErrSomeThingWentWrong = "some thing went wrong" + ErrAuthUserNotFound = "user not found" + ErrEmailIsNotValid = "email is not valid" + ErrAuthUserExisting = "a user with this email is already registered" + ErrPasswordIsNotValid = "password is not valid" + ErrorMsgInvalidInput = "invalid input" + ErrBadRequest = "Bad request" + ErrUserNotFound = "user not found" + ErrChannelNotFound = "channel not found: %v" + ErrFailedToOpenChannel = "failed to open rabbitmq channel" + ErrFailedToCloseChannel = "failed to close rabbitmq channel" + ErrFailedToUpdateWriteKeyMetaData = "failed to update write key metadata" + ErrFailedToGetWriteKeyMetaData = "get write key metadata" + ErrWriteKeyNotFound = "write key not found" ) diff --git a/source/params/writekey.go b/source/params/writekey.go new file mode 100644 index 00000000..7813a72f --- /dev/null +++ b/source/params/writekey.go @@ -0,0 +1,12 @@ +package params + +import "time" + +type WriteKeyMetaData struct { + WriteKey string `json:"write_key"` + OwnerID string `json:"owner_id"` + SourceID string `json:"source_id"` + CreatedAt time.Time `json:"created_at"` + LastUsedAt time.Time `json:"last_used_at"` + Status string `json:"status"` +} diff --git a/source/repository/redis/rediswritekey/db.go b/source/repository/redis/rediswritekey/db.go deleted file mode 100644 index 9f006012..00000000 --- a/source/repository/redis/rediswritekey/db.go +++ /dev/null @@ -1,12 +0,0 @@ -package rediswritekey - -import "github.com/ormushq/ormus/adapter/redis" - -type DB struct { - adapter redis.Adapter -} - -// New is Constructor redis DB. -func New(adapter redis.Adapter) DB { - return DB{adapter: adapter} -} diff --git a/source/repository/redis/rediswritekey/db_test.go b/source/repository/redis/rediswritekey/db_test.go deleted file mode 100644 index 45d32e36..00000000 --- a/source/repository/redis/rediswritekey/db_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package rediswritekey_test - -import ( - "os" - "testing" - - "github.com/ormushq/ormus/adapter/redis" - "github.com/ormushq/ormus/config" - "github.com/ormushq/ormus/source/repository/redis/rediswritekey" - "github.com/stretchr/testify/assert" -) - -func TestMain(m *testing.M) { - os.Exit(m.Run()) -} - -func setup(t *testing.T) (rediswritekey.DB, func()) { - redisAdapter, err := redis.New(config.C().Redis) - assert.Nil(t, err) - redisRepository := rediswritekey.New(redisAdapter) - return redisRepository, func() { - // TODO - cleanup - } -} diff --git a/source/repository/redis/writekey/db.go b/source/repository/redis/writekey/db.go new file mode 100644 index 00000000..eebfefbe --- /dev/null +++ b/source/repository/redis/writekey/db.go @@ -0,0 +1,92 @@ +package writekey + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/ormushq/ormus/adapter/redis" + "github.com/ormushq/ormus/contract/protobuf/manager/goproto/writekey" + "github.com/ormushq/ormus/manager/entity" +) + +type DB struct { + adapter redis.Adapter + managerClient writekey.WriteKeyManagerClient +} + +func New(adapter redis.Adapter, managerClient writekey.WriteKeyManagerClient) DB { + return DB{ + adapter: adapter, + managerClient: managerClient, + } +} + +func (db DB) IsValidWriteKey(ctx context.Context, writeKey string) (*entity.WriteKeyMetaData, error) { + // try to get it from redis first + result, err := db.getFromRedis(ctx, writeKey) + if err == nil { + return result, nil + } + + // Key was not in the cache, make a GRPC call to manager + resp, err := db.managerClient.GetWriteKey(ctx, &writekey.GetWriteKeyRequest{ + WriteKey: writeKey, + }) + if err != nil { + return nil, fmt.Errorf("failed to get write key from manager: %w", err) + } + + metadata := &entity.WriteKeyMetaData{ + WriteKey: resp.Metadata.WriteKey, + OwnerID: resp.Metadata.OwnerId, + SourceID: resp.Metadata.SourceId, + CreatedAt: resp.Metadata.CreatedAt.AsTime(), + LastUsedAt: resp.Metadata.LastUsedAt.AsTime(), + Status: entity.WriteKeyStatus(resp.Metadata.Status.String()), + } + + // cache the metadata + if err := db.updateRedis(ctx, writeKey, metadata); err != nil { // TODO: retrying strategy? + // Continue even though there is an error + // TODO: logging maybe! I suppose nothing happen when Redis's not updated, we send GRPC. + } + + return metadata, nil +} + +func (db DB) getFromRedis(ctx context.Context, writeKey string) (*entity.WriteKeyMetaData, error) { + metadataJSON, err := db.adapter.Client().Get(ctx, "writekey:"+writeKey).Result() + if err == nil { + var metadata entity.WriteKeyMetaData + if err := json.Unmarshal([]byte(metadataJSON), &metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal writekey metadata: %w", err) + } + + return &metadata, nil + } + + return &entity.WriteKeyMetaData{}, err +} + +func (db DB) updateRedis(ctx context.Context, writeKey string, metadata *entity.WriteKeyMetaData) error { + metadataJSON, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal writekey metadata: %w", err) + } + // we set TTL for write keys, we don't need to fill up the memory with + // all the already generated write keys. + return db.adapter.Client().Set(ctx, "writekey:"+writeKey, metadataJSON, 24*time.Hour).Err() +} + +// InvalidateWriteKey is mechanism of write key invalidation in cache(Redis) when +// a write key is updated or deleted in the core(manager) service. +func (db DB) InvalidateWriteKey(ctx context.Context, writeKey string) error { + err := db.adapter.Client().Del(ctx, "writekey:"+writeKey).Err() + if err != nil { + return fmt.Errorf("failed to invalidate write key in Redis: %w", err) + } + + return nil +} diff --git a/source/repository/redis/writekey/db_test.go b/source/repository/redis/writekey/db_test.go new file mode 100644 index 00000000..d93862c2 --- /dev/null +++ b/source/repository/redis/writekey/db_test.go @@ -0,0 +1,35 @@ +package writekey_test + +import ( + "google.golang.org/grpc" + "os" + "testing" + + "github.com/ormushq/ormus/adapter/redis" + "github.com/ormushq/ormus/config" + pbwritekey "github.com/ormushq/ormus/contract/protobuf/manager/goproto/writekey" + "github.com/ormushq/ormus/source/repository/redis/writekey" + "github.com/stretchr/testify/assert" +) + +func TestMain(m *testing.M) { + os.Exit(m.Run()) +} + +func setup(t *testing.T) (writekey.DB, func()) { + redisAdapter, err := redis.New(config.C().Redis) + assert.Nil(t, err) + + managerClientConn, err := grpc.Dial(config.C().Manager.GRPCServiceAddress, grpc.WithInsecure()) + assert.Nil(t, err) + + managerClient := pbwritekey.NewWriteKeyManagerClient(managerClientConn) + + redisRepository := writekey.New(redisAdapter, managerClient) + return redisRepository, func() { + err := managerClientConn.Close() + if err != nil { + return + } + } +} diff --git a/source/service/writekey/isvalid.go b/source/service/writekey/isvalid.go new file mode 100644 index 00000000..d7c849bb --- /dev/null +++ b/source/service/writekey/isvalid.go @@ -0,0 +1,35 @@ +package writekey + +import ( + "context" + + "github.com/ormushq/ormus/pkg/richerror" + "github.com/ormushq/ormus/source/params" +) + +// IsValid checks whether the writeKey is valid or not. +func (s Service) IsValid(ctx context.Context, writeKey string) (*params.WriteKeyMetaData, error) { + const op = "writekey.IsValid" + + writeKeyMetaData, err := s.repo.IsValidWriteKey(ctx, writeKey) + if err != nil { + return nil, richerror.New(op).WithWrappedError(err) + } + + // If we got the metadata, then the key is valid + if writeKeyMetaData != nil { + dto := ¶ms.WriteKeyMetaData{ + WriteKey: writeKeyMetaData.WriteKey, + OwnerID: writeKeyMetaData.OwnerID, + SourceID: writeKeyMetaData.SourceID, + CreatedAt: writeKeyMetaData.CreatedAt, + LastUsedAt: writeKeyMetaData.LastUsedAt, + Status: string(writeKeyMetaData.Status), + } + + return dto, nil + } + + // no metadata, no error then key is invalid + return nil, nil +} diff --git a/source/service/writekey/service.go b/source/service/writekey/service.go index f6b04e7d..6ca5b741 100644 --- a/source/service/writekey/service.go +++ b/source/service/writekey/service.go @@ -2,37 +2,19 @@ package writekey import ( "context" + "github.com/ormushq/ormus/manager/entity" ) -// Repository is an interface representing what methods should be implemented by the repository. type Repository interface { - // TODO - implementation redis - IsValidWriteKey(ctx context.Context, writeKey string) (bool, error) + IsValidWriteKey(ctx context.Context, writeKey string) (*entity.WriteKeyMetaData, error) } -// Service show dependencies writeKey authservice. type Service struct { repo Repository } -// Constructor writeKey authservice. func New(repo Repository) Service { return Service{ repo: repo, } } - -// IsValid checks whether the writeKey is valid or not. -func (s Service) IsValid(ctx context.Context, writeKey string) (bool, error) { - // TODO - How errmsg handling ? Rich-errmsg or ...? - isValid, err := s.repo.IsValidWriteKey(ctx, writeKey) - if err != nil { - // TODO - logger - return false, err - } - if !isValid { - return false, err - } - - return true, nil -} diff --git a/source/service/writekey/service_test.go b/source/service/writekey/service_test.go index 377a780c..0dd4114d 100644 --- a/source/service/writekey/service_test.go +++ b/source/service/writekey/service_test.go @@ -3,6 +3,7 @@ package writekey_test import ( "context" "fmt" + "github.com/ormushq/ormus/manager/entity" "testing" "github.com/ormushq/ormus/source/service/writekey" @@ -10,12 +11,11 @@ import ( type mockRepo struct{} -// TODO - use https://github.com/golang/mock -func (m mockRepo) IsValidWriteKey(ctx context.Context, writeKey string) (bool, error) { +func (m mockRepo) IsValidWriteKey(ctx context.Context, writeKey string) (*entity.WriteKeyMetaData, error) { if writeKey == "" { - return false, fmt.Errorf("writekey not found") + return nil, fmt.Errorf("writekey not found") } - return true, nil + return &entity.WriteKeyMetaData{}, nil } func TestIsValid(t *testing.T) { @@ -33,11 +33,11 @@ func TestIsValid(t *testing.T) { m := new(mockRepo) service := writekey.New(m) ctx := context.Background() - isValid, err := service.IsValid(ctx, "asdfffg4g5g56d5s4s6s5sd8") + metaData, err := service.IsValid(ctx, "asdfffg4g5g56d5s4s6s5sd8") if err != nil { t.Fatal("error is not nil") } - if !isValid { + if metaData == nil { t.Fatal("writekey is not valid") } })