Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated summary with pod's parent information #733

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 74 additions & 94 deletions src/cluster/k8sClientHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,11 +430,12 @@ func GetClusterNameFromK8sClient() string {
return "default"
}

// ================= //
// == Deployments == //
// ================= //
// =================== //
// == K8s Resources == //
// =================== //

func GetObjectSetsFromK8sClient(objectSet []string) []types.Deployment {

func GetDeploymentsFromK8sClient() []types.Deployment {
results := []types.Deployment{}

client := ConnectK8sClient()
Expand All @@ -443,124 +444,103 @@ func GetDeploymentsFromK8sClient() []types.Deployment {
return results
}

// get namespaces from k8s api client
deployments, err := client.AppsV1().Deployments("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, obj := range objectSet {

for _, d := range deployments.Items {
if d.Namespace == "kube-system" {
continue
}
switch obj {
case types.K8sDeploymentType:
deployments, err := client.AppsV1().Deployments("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, d := range deployments.Items {
results = append(results, GenerateResourceList(d.OwnerReferences, d.Name, d.Namespace, types.K8sDeploymentType, d.Spec.Selector.MatchLabels)...)
}

if d.Spec.Selector.MatchLabels != nil {
var labels []string
case types.K8sStatefulSetsType:
// get StatefulSet List from k8s api client
statefulSets, err := client.AppsV1().StatefulSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, sts := range statefulSets.Items {
results = append(results, GenerateResourceList(sts.OwnerReferences, sts.Name, sts.Namespace, types.K8sStatefulSetsType, sts.Spec.Selector.MatchLabels)...)
}

for k, v := range d.Spec.Selector.MatchLabels {
labels = append(labels, k+"="+v)
case types.K8sDaemonSetsType:
// get DaemonSet List from k8s api client
daemonSets, err := client.AppsV1().DaemonSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, ds := range daemonSets.Items {
results = append(results, GenerateResourceList(ds.OwnerReferences, ds.Name, ds.Namespace, types.K8sDaemonSetsType, ds.Spec.Selector.MatchLabels)...)
}

results = append(results, types.Deployment{
Name: d.Name,
Namespace: d.Namespace,
Labels: strings.Join(labels, ","),
})
case types.K8sReplicaSetsType:
// get ReplicaSet list from k8s api client
replicaSets, err := client.AppsV1().ReplicaSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, rs := range replicaSets.Items {
results = append(results, GenerateResourceList(rs.OwnerReferences, rs.Name, rs.Namespace, types.K8sReplicaSetsType, rs.Spec.Selector.MatchLabels)...)
}
case types.K8sJobsType:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to consider cronjobs? Although it creates new jobs periodically.

// get Jobs List from k8s api client
jobs, err := client.BatchV1().Jobs("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}
for _, job := range jobs.Items {
results = append(results, GenerateResourceList(job.OwnerReferences, job.Name, job.Namespace, types.K8sJobsType, job.Spec.Selector.MatchLabels)...)
}
default:
continue
}
}

results = append(results, GetReplicaSetsFromK8sClient()...)
results = append(results, GetStatefulSetsFromK8sClient()...)
}

return results
}

// ================= //
// == ReplicaSet == //
// ================= //
}

func GetReplicaSetsFromK8sClient() []types.Deployment {
func GenerateResourceList(ownerReference []metav1.OwnerReference, name, namespace, resourceType string, labelMap map[string]string) []types.Deployment {
results := []types.Deployment{}

client := ConnectK8sClient()
if client == nil {
log.Error().Msg("failed to create k8s client")
return results
}

// get namespaces from k8s api client
replicasets, err := client.AppsV1().ReplicaSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}

for _, rs := range replicasets.Items {
if rs.OwnerReferences == nil {
if rs.Namespace == "kube-system" {
continue
}

if rs.Spec.Selector.MatchLabels != nil {
var labels []string

for k, v := range rs.Spec.Selector.MatchLabels {
labels = append(labels, k+"="+v)
}

results = append(results, types.Deployment{
Name: rs.Name,
Namespace: rs.Namespace,
Labels: strings.Join(labels, ","),
})
if resourceType == types.K8sJobsType {
for _, or := range ownerReference {
if or.Kind == types.K8sCronJobsType {
ownerReference = nil
resourceType = types.K8sCronJobsType
break
}
}
}
return results
}

// ================= //
// == StatefulSet == //
// ================= //

func GetStatefulSetsFromK8sClient() []types.Deployment {
results := []types.Deployment{}

client := ConnectK8sClient()
if client == nil {
log.Error().Msg("failed to create k8s client")
return results
}

// get namespaces from k8s api client
statefulset, err := client.AppsV1().StatefulSets("").List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Error().Msg(err.Error())
return results
}

for _, sts := range statefulset.Items {
if sts.OwnerReferences == nil {
if sts.Namespace == "kube-system" {
continue
}

if sts.Spec.Selector.MatchLabels != nil {
if namespace != "kube-system" {
if ownerReference == nil {
if labelMap != nil {
var labels []string

for k, v := range sts.Spec.Selector.MatchLabels {
for k, v := range labelMap {
labels = append(labels, k+"="+v)
}

results = append(results, types.Deployment{
Name: sts.Name,
Namespace: sts.Namespace,
Name: name,
Namespace: namespace,
Labels: strings.Join(labels, ","),
Type: resourceType,
})
}
}
}

return results
}

Expand Down
16 changes: 16 additions & 0 deletions src/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ func StringDeDuplication(strSlice []string) []string {
return list
}

func MapDeDuplicates(inputMap map[string]string) map[string]string {
// Create a new map to store the unique values
uniqueMap := make(map[string]string)

// Iterate over the input map
for key, value := range inputMap {
// Check if the value already exists in the unique map
if _, exists := uniqueMap[key]; !exists {
// If not, add it to the unique map
uniqueMap[key] = value
}
}

return uniqueMap
}

func HashInt(s string) uint32 {
h := fnv.New32a()
_, _ = h.Write([]byte(s))
Expand Down
1 change: 1 addition & 0 deletions src/conf/local-file.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ recommend:
operation-mode: 1 # 1: cronjob | 2: one-time-job
cron-job-time-interval: "1h0m00s" # format: XhYmZs
recommend-host-policy: true
admission-controller-policy: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?


# license
license:
Expand Down
21 changes: 14 additions & 7 deletions src/libs/dbHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ func GetPodNames(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error
return res, err
}

func GetDeployNames(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error) {
res := []string{}
func GetDeployNames(cfg types.ConfigDB, filter types.ObsPodDetail) (map[string]string, error) {
res := map[string]string{}
var err = errors.New("unknown db driver")
if cfg.DBDriver == "mysql" {
res, err = GetDeployNamesMySQL(cfg, filter)
Expand Down Expand Up @@ -433,7 +433,7 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type
summary.Labels = strings.Join(sortedLabels, ",")

queryString := `cluster_name = ? and cluster_id = ? and workspace_id = ? and namespace_name = ? and namespace_id = ? and container_name = ? and container_image = ?
and podname = ? and operation = ? and labels = ? and deployment_name = ? and source = ? and destination = ?
and podname = ? and operation = ? and labels = ? and resource_name = ? and resource_type = ? and source = ? and destination = ?
and destination_namespace = ? and destination_labels = ? and type = ? and ip = ? and port = ? and protocol = ? and action = ? and bindport = ? and bindaddr = ?`

query := "UPDATE " + TableSystemSummarySQLite + " SET count=count+?, updated_time=? WHERE " + queryString + " "
Expand All @@ -458,6 +458,7 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type
summary.Operation,
summary.Labels,
summary.Deployment,
summary.Workload.Type,
summary.Source,
summary.Destination,
summary.DestNamespace,
Expand All @@ -479,8 +480,8 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type

if err == nil && rowsAffected == 0 {

insertQueryString := `(cluster_name,cluster_id,workspace_id,namespace_name,namespace_id,container_name,container_image,container_id,podname,operation,labels,deployment_name,
source,destination,destination_namespace,destination_labels,type,ip,port,protocol,action,count,updated_time,bindport,bindaddr) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`
insertQueryString := `(cluster_name,cluster_id,workspace_id,namespace_name,namespace_id,container_name,container_image,container_id,podname,operation,labels,resource_name, resource_type,
source,destination,destination_namespace,destination_labels,type,ip,port,protocol,action,count,updated_time,bindport,bindaddr) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`

insertQuery := "INSERT INTO " + TableSystemSummarySQLite + insertQueryString

Expand All @@ -503,6 +504,7 @@ func upsertSysSummarySQL(db *sql.DB, summary types.SystemSummary, timeCount type
summary.Operation,
summary.Labels,
summary.Deployment,
summary.Workload.Type,
summary.Source,
summary.Destination,
summary.DestNamespace,
Expand Down Expand Up @@ -545,7 +547,7 @@ func getSysSummarySQL(db *sql.DB, dbName string, filterOptions types.SystemSumma
resSummary := []types.SystemSummary{}

query := `SELECT cluster_name,cluster_id,workspace_id,namespace_name,namespace_id,container_name,
container_image,container_id,podname,operation,labels,deployment_name,source,destination,destination_namespace,
container_image,container_id,podname,operation,labels,resource_name,resource_type,source,destination,destination_namespace,
destination_labels,type,ip,port,protocol,action,count,updated_time,bindport,bindaddr FROM ` + dbName

var whereClause string
Expand Down Expand Up @@ -596,9 +598,13 @@ func getSysSummarySQL(db *sql.DB, dbName string, filterOptions types.SystemSumma
args = append(args, filterOptions.Labels)
}
if filterOptions.Deployment != "" {
concatWhereClause(&whereClause, "deployment_name")
concatWhereClause(&whereClause, "resource_name")
args = append(args, filterOptions.Deployment)
}
if filterOptions.Workload.Type != "" {
concatWhereClause(&whereClause, "parent_type")
args = append(args, filterOptions.Workload.Type)
}
if filterOptions.Source != "" {
concatWhereClause(&whereClause, "source")
args = append(args, filterOptions.Source)
Expand Down Expand Up @@ -631,6 +637,7 @@ func getSysSummarySQL(db *sql.DB, dbName string, filterOptions types.SystemSumma
&localSum.Operation,
&localSum.Labels,
&localSum.Deployment,
&localSum.Workload.Type,
&localSum.Source,
&localSum.Destination,
&localSum.DestNamespace,
Expand Down
31 changes: 20 additions & 11 deletions src/libs/mysqlHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,9 +1572,13 @@ func GetPodNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string,
concatWhereClause(&whereClause, "container_name")
sysargs = append(sysargs, filter.ContainerName)
}
if filter.DeployName != "" {
concatWhereClause(&whereClause, "deployment_name")
sysargs = append(sysargs, filter.DeployName)
if filter.ParentName != "" {
concatWhereClause(&whereClause, "resource_name")
sysargs = append(sysargs, filter.ParentName)
}
if filter.ParentType != "" {
concatWhereClause(&whereClause, "resource_type")
sysargs = append(sysargs, filter.ParentType)
}

results, err = db.Query(query+whereClause, sysargs...)
Expand All @@ -1597,17 +1601,17 @@ func GetPodNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string,
return resPodNames, err
}

func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error) {
func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) (map[string]string, error) {
db := connectMySQL(cfg)
defer db.Close()

resDeployNames := []string{}
resDeployNames := map[string]string{}

var results *sql.Rows
var err error

// Get podnames from system table
query := "SELECT deployment_name FROM " + TableSystemSummarySQLite + " "
query := "SELECT resource_name, resource_type FROM " + TableSystemSummarySQLite + " "

var whereClause string
var sysargs []interface{}
Expand All @@ -1620,9 +1624,13 @@ func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]strin
concatWhereClause(&whereClause, "namespace_name")
sysargs = append(sysargs, filter.Namespace)
}
if filter.DeployName != "" {
concatWhereClause(&whereClause, "deployment_name")
sysargs = append(sysargs, filter.DeployName)
if filter.ParentName != "" {
concatWhereClause(&whereClause, "resource_name")
sysargs = append(sysargs, filter.ParentName)
}
if filter.ParentType != "" {
concatWhereClause(&whereClause, "resource_type")
sysargs = append(sysargs, filter.ParentType)
}
if filter.Labels != "" {
concatWhereClause(&whereClause, "labels")
Expand All @@ -1637,13 +1645,14 @@ func GetDeployNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]strin
defer results.Close()

for results.Next() {
var locDeployName string
var locDeployName, locDeployType string
if err := results.Scan(
&locDeployName,
&locDeployType,
); err != nil {
return nil, err
}
resDeployNames = append(resDeployNames, locDeployName)
resDeployNames[locDeployName] = locDeployType
}

return resDeployNames, err
Expand Down
Loading