Skip to content

Commit

Permalink
Updated summary information
Browse files Browse the repository at this point in the history
- Updated summary information to include pod's parent_name and parent_type
- Updated functions to retrieve daemonset details
- Updated constants with deployment, replicaset, statefulset, daemonset and jobs values
- Updated protobuf definitions to include parent_name and parent_type to pod information

Signed-off-by: Vishnu Soman <[email protected]>
  • Loading branch information
vishnusomank committed Jun 12, 2023
1 parent 418d8cc commit e586e08
Show file tree
Hide file tree
Showing 21 changed files with 496 additions and 390 deletions.
168 changes: 74 additions & 94 deletions src/cluster/k8sClientHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,12 @@ func GetClusterNameFromK8sClient() string {
return "default"
}

// ================= //
// == Deployments == //
// ================= //
// =================== //
// == K8s Resources == //
// =================== //

func GetObjectSetsFromK8sClient(objectSet []string) []types.Deployment {

func GetDeploymentsFromK8sClient() []types.Deployment {
results := []types.Deployment{}

client := ConnectK8sClient()
Expand All @@ -443,124 +444,103 @@ func GetDeploymentsFromK8sClient() []types.Deployment {
return results
}

// get namespaces from k8s api client
deployments, err := client.AppsV1().Deployments("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, obj := range objectSet {

for _, d := range deployments.Items {
if d.Namespace == "kube-system" {
continue
}
switch obj {
case types.K8sDeploymentType:
deployments, err := client.AppsV1().Deployments("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, d := range deployments.Items {
results = append(results, GenerateResourceList(d.OwnerReferences, d.Name, d.Namespace, types.K8sDeploymentType, d.Spec.Selector.MatchLabels)...)
}

if d.Spec.Selector.MatchLabels != nil {
var labels []string
case types.K8sStatefulSetsType:
// get StatefulSet List from k8s api client
statefulSets, err := client.AppsV1().StatefulSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, sts := range statefulSets.Items {
results = append(results, GenerateResourceList(sts.OwnerReferences, sts.Name, sts.Namespace, types.K8sStatefulSetsType, sts.Spec.Selector.MatchLabels)...)
}

for k, v := range d.Spec.Selector.MatchLabels {
labels = append(labels, k+"="+v)
case types.K8sDaemonSetsType:
// get DaemonSet List from k8s api client
daemonSets, err := client.AppsV1().DaemonSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, ds := range daemonSets.Items {
results = append(results, GenerateResourceList(ds.OwnerReferences, ds.Name, ds.Namespace, types.K8sDaemonSetsType, ds.Spec.Selector.MatchLabels)...)
}

results = append(results, types.Deployment{
Name: d.Name,
Namespace: d.Namespace,
Labels: strings.Join(labels, ","),
})
case types.K8sReplicaSetsType:
// get ReplicaSet list from k8s api client
replicaSets, err := client.AppsV1().ReplicaSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, rs := range replicaSets.Items {
results = append(results, GenerateResourceList(rs.OwnerReferences, rs.Name, rs.Namespace, types.K8sReplicaSetsType, rs.Spec.Selector.MatchLabels)...)
}
case types.K8sJobsType:
// get Jobs List from k8s api client
jobs, err := client.BatchV1().Jobs("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, job := range jobs.Items {
results = append(results, GenerateResourceList(job.OwnerReferences, job.Name, job.Namespace, types.K8sJobsType, job.Spec.Selector.MatchLabels)...)
}
default:
continue
}
}

results = append(results, GetReplicaSetsFromK8sClient()...)
results = append(results, GetStatefulSetsFromK8sClient()...)
}

return results
}

// ================= //
// == ReplicaSet == //
// ================= //
}

func GetReplicaSetsFromK8sClient() []types.Deployment {
func GenerateResourceList(ownerReference []metav1.OwnerReference, name, namespace, resourceType string, labelMap map[string]string) []types.Deployment {
results := []types.Deployment{}

client := ConnectK8sClient()
if client == nil {
log.Error().Msg("failed to create k8s client")
return results
}

// get namespaces from k8s api client
replicasets, err := client.AppsV1().ReplicaSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}

for _, rs := range replicasets.Items {
if rs.OwnerReferences == nil {
if rs.Namespace == "kube-system" {
continue
}

if rs.Spec.Selector.MatchLabels != nil {
var labels []string

for k, v := range rs.Spec.Selector.MatchLabels {
labels = append(labels, k+"="+v)
}

results = append(results, types.Deployment{
Name: rs.Name,
Namespace: rs.Namespace,
Labels: strings.Join(labels, ","),
})
if resourceType == types.K8sJobsType {
for _, or := range ownerReference {
if or.Kind == types.K8sCronJobsType {
ownerReference = nil
resourceType = types.K8sCronJobsType
break
}
}
}
return results
}

// ================= //
// == StatefulSet == //
// ================= //

func GetStatefulSetsFromK8sClient() []types.Deployment {
results := []types.Deployment{}

client := ConnectK8sClient()
if client == nil {
log.Error().Msg("failed to create k8s client")
return results
}

// get namespaces from k8s api client
statefulset, err := client.AppsV1().StatefulSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}

for _, sts := range statefulset.Items {
if sts.OwnerReferences == nil {
if sts.Namespace == "kube-system" {
continue
}

if sts.Spec.Selector.MatchLabels != nil {
if namespace != "kube-system" {
if ownerReference == nil {
if labelMap != nil {
var labels []string

for k, v := range sts.Spec.Selector.MatchLabels {
for k, v := range labelMap {
labels = append(labels, k+"="+v)
}

results = append(results, types.Deployment{
Name: sts.Name,
Namespace: sts.Namespace,
Name: name,
Namespace: namespace,
Labels: strings.Join(labels, ","),
Type: resourceType,
})
}
}
}

return results
}

Expand Down
16 changes: 16 additions & 0 deletions src/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ func StringDeDuplication(strSlice []string) []string {
return list
}

func MapDeDuplicates(inputMap map[string]string) map[string]string {
// Create a new map to store the unique values
uniqueMap := make(map[string]string)

// Iterate over the input map
for key, value := range inputMap {
// Check if the value already exists in the unique map
if _, exists := uniqueMap[key]; !exists {
// If not, add it to the unique map
uniqueMap[key] = value
}
}

return uniqueMap
}

func HashInt(s string) uint32 {
h := fnv.New32a()
_, _ = h.Write([]byte(s))
Expand Down
1 change: 1 addition & 0 deletions src/conf/local-file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ recommend:
operation-mode: 1 # 1: cronjob | 2: one-time-job
cron-job-time-interval: "1h0m00s" # format: XhYmZs
recommend-host-policy: true
admission-controller-policy: false

# license
license:
Expand Down
21 changes: 14 additions & 7 deletions src/libs/dbHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ func GetPodNames(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error
return res, err
}

func GetDeployNames(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error) {
res := []string{}
func GetDeployNames(cfg types.ConfigDB, filter types.ObsPodDetail) (map[string]string, error) {
res := map[string]string{}
var err = errors.New("unknown db driver")
if cfg.DBDriver == "mysql" {
res, err = GetDeployNamesMySQL(cfg, filter)
Expand Down Expand Up @@ -433,7 +433,7 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type
summary.Labels = strings.Join(sortedLabels, ",")

queryString := `cluster_name = ? and cluster_id = ? and workspace_id = ? and namespace_name = ? and namespace_id = ? and container_name = ? and container_image = ?
and podname = ? and operation = ? and labels = ? and deployment_name = ? and source = ? and destination = ?
and podname = ? and operation = ? and labels = ? and resource_name = ? and resource_type = ? and source = ? and destination = ?
and destination_namespace = ? and destination_labels = ? and type = ? and ip = ? and port = ? and protocol = ? and action = ? and bindport = ? and bindaddr = ?`

query := "UPDATE " + TableSystemSummarySQLite + " SET count=count+?, updated_time=? WHERE " + queryString + " "
Expand All @@ -458,6 +458,7 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type
summary.Operation,
summary.Labels,
summary.Deployment,
summary.Workload.Type,
summary.Source,
summary.Destination,
summary.DestNamespace,
Expand All @@ -479,8 +480,8 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type

if err == nil && rowsAffected == 0 {

insertQueryString := `(cluster_name,cluster_id,workspace_id,namespace_name,namespace_id,container_name,container_image,container_id,podname,operation,labels,deployment_name,
source,destination,destination_namespace,destination_labels,type,ip,port,protocol,action,count,updated_time,bindport,bindaddr) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
insertQueryString := `(cluster_name,cluster_id,workspace_id,namespace_name,namespace_id,container_name,container_image,container_id,podname,operation,labels,resource_name, resource_type,
source,destination,destination_namespace,destination_labels,type,ip,port,protocol,action,count,updated_time,bindport,bindaddr) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`

insertQuery := "INSERT INTO " + TableSystemSummarySQLite + insertQueryString

Expand All @@ -503,6 +504,7 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type
summary.Operation,
summary.Labels,
summary.Deployment,
summary.Workload.Type,
summary.Source,
summary.Destination,
summary.DestNamespace,
Expand Down Expand Up @@ -545,7 +547,7 @@ func getSysSummarySQL(db *sql.DB, dbName string, filterOptions types.SystemSumma
resSummary := []types.SystemSummary{}

query := `SELECT cluster_name,cluster_id,workspace_id,namespace_name,namespace_id,container_name,
container_image,container_id,podname,operation,labels,deployment_name,source,destination,destination_namespace,
container_image,container_id,podname,operation,labels,resource_name,resource_type,source,destination,destination_namespace,
destination_labels,type,ip,port,protocol,action,count,updated_time,bindport,bindaddr FROM ` + dbName

var whereClause string
Expand Down Expand Up @@ -596,9 +598,13 @@ func getSysSummarySQL(db *sql.DB, dbName string, filterOptions types.SystemSumma
args = append(args, filterOptions.Labels)
}
if filterOptions.Deployment != "" {
concatWhereClause(&whereClause, "deployment_name")
concatWhereClause(&whereClause, "resource_name")
args = append(args, filterOptions.Deployment)
}
if filterOptions.Workload.Type != "" {
concatWhereClause(&whereClause, "parent_type")
args = append(args, filterOptions.Workload.Type)
}
if filterOptions.Source != "" {
concatWhereClause(&whereClause, "source")
args = append(args, filterOptions.Source)
Expand Down Expand Up @@ -631,6 +637,7 @@ func getSysSummarySQL(db *sql.DB, dbName string, filterOptions types.SystemSumma
&localSum.Operation,
&localSum.Labels,
&localSum.Deployment,
&localSum.Workload.Type,
&localSum.Source,
&localSum.Destination,
&localSum.DestNamespace,
Expand Down
31 changes: 20 additions & 11 deletions src/libs/mysqlHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,9 +1572,13 @@ func GetPodNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string,
concatWhereClause(&whereClause, "container_name")
sysargs = append(sysargs, filter.ContainerName)
}
if filter.DeployName != "" {
concatWhereClause(&whereClause, "deployment_name")
sysargs = append(sysargs, filter.DeployName)
if filter.ParentName != "" {
concatWhereClause(&whereClause, "resource_name")
sysargs = append(sysargs, filter.ParentName)
}
if filter.ParentType != "" {
concatWhereClause(&whereClause, "resource_type")
sysargs = append(sysargs, filter.ParentType)
}

results, err = db.Query(query+whereClause, sysargs...)
Expand All @@ -1597,17 +1601,17 @@ func GetPodNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string,
return resPodNames, err
}

func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error) {
func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) (map[string]string, error) {
db := connectMySQL(cfg)
defer db.Close()

resDeployNames := []string{}
resDeployNames := map[string]string{}

var results *sql.Rows
var err error

// Get podnames from system table
query := "SELECT deployment_name FROM " + TableSystemSummarySQLite + " "
query := "SELECT resource_name, resource_type FROM " + TableSystemSummarySQLite + " "

var whereClause string
var sysargs []interface{}
Expand All @@ -1620,9 +1624,13 @@ func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]strin
concatWhereClause(&whereClause, "namespace_name")
sysargs = append(sysargs, filter.Namespace)
}
if filter.DeployName != "" {
concatWhereClause(&whereClause, "deployment_name")
sysargs = append(sysargs, filter.DeployName)
if filter.ParentName != "" {
concatWhereClause(&whereClause, "resource_name")
sysargs = append(sysargs, filter.ParentName)
}
if filter.ParentType != "" {
concatWhereClause(&whereClause, "resource_type")
sysargs = append(sysargs, filter.ParentType)
}
if filter.Labels != "" {
concatWhereClause(&whereClause, "labels")
Expand All @@ -1637,13 +1645,14 @@ func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]strin
defer results.Close()

for results.Next() {
var locDeployName string
var locDeployName, locDeployType string
if err := results.Scan(
&locDeployName,
&locDeployType,
); err != nil {
return nil, err
}
resDeployNames = append(resDeployNames, locDeployName)
resDeployNames[locDeployName] = locDeployType
}

return resDeployNames, err
Expand Down
Loading

0 comments on commit e586e08

Please sign in to comment.