Skip to content

Commit

Permalink
Merge pull request Sunbird-Knowlg#1310 from project-sunbird/flink-con…
Browse files Browse the repository at this point in the history
…fig-3.6.0

assessment.publish.request topic and questionset publish flink config
  • Loading branch information
maheshkumargangula authored Jan 18, 2021
2 parents b255d21 + d8a0695 commit 6f75034
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 134 deletions.
2 changes: 1 addition & 1 deletion ansible/inventory/env/group_vars/all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ learner_user_home: "/home/{{ learner_user }}"
# these variables can be defined as {{instance}}_*
content_keyspace_name: "{{instance}}_content_store"
hierarchy_keyspace_name: "{{instance}}_hierarchy_store"
orchestrator_keyspace_name: "{{instance}}_script_store"
dialcode_keyspace_name: "{{instance}}_dialcode_store"
category_keyspace_name: "{{instance}}_category_store"
assessment_keyspace_name: "{{instance}}_question_store"


dp_cassandra_connection: "{{ groups['dp-cassandra'][0] }}:9042"
Expand Down
17 changes: 0 additions & 17 deletions ansible/roles/cassandra-db-update/templates/data.cql.j2
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ CREATE TABLE IF NOT EXISTS {{ content_keyspace_name }}.content_data (
PRIMARY KEY (content_id)
);

CREATE KEYSPACE IF NOT EXISTS {{ orchestrator_keyspace_name }} WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};

CREATE TABLE IF NOT EXISTS {{ orchestrator_keyspace_name }}.script_data (
name text,
type text,
reqmap text,
PRIMARY KEY (name)
);

CREATE TABLE IF NOT EXISTS {{ content_keyspace_name }}.question_data (
question_id text,
last_updated_on timestamp,
Expand Down Expand Up @@ -65,11 +53,6 @@ ALTER KEYSPACE {{ hierarchy_keyspace_name }} WITH replication = {
'datacenter1' : 2
};

ALTER KEYSPACE {{ orchestrator_keyspace_name }} WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1' : 2
};

ALTER TABLE {{ content_keyspace_name }}.content_data ADD (externallink text);
{% endif %}

Expand Down
2 changes: 0 additions & 2 deletions ansible/roles/learning-service/templates/application.conf.j2
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ redis.port={{ redis_port }}
redis.maxConnections=128

# Cassandra Configuration
orchestrator.keyspace.name="{{ orchestrator_keyspace_name }}"
orchestrator.keyspace.table="script_data"
content.keyspace.name="{{ content_keyspace_name }}"
hierarchy.keyspace.name="{{ hierarchy_keyspace_name }}"
content.hierarchy.table="content_hierarchy"
Expand Down
8 changes: 6 additions & 2 deletions ansible/roles/setup-kafka/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ processing_kafka_topics:
- name: generate.certificate.failed
num_of_partitions: 1
replication_factor: 1
- name: assessment.publish.request
num_of_partitions: 1
replication_factor: 1

processing_kafka_overriden_topics:
- name: telemetry.raw
Expand Down Expand Up @@ -228,5 +231,6 @@ processing_kafka_overriden_topics:
- name: generate.certificate.failed
retention_time: 1209600000
replication_factor: 1


- name: assessment.publish.request
retention_time: 1209600000
replication_factor: 1
18 changes: 2 additions & 16 deletions kubernetes/ansible/roles/flink-jobs-deploy/defaults/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,8 @@ flink_job_names:
taskmanager_memory: 1024m
taskslots: 1
cpu_requests: 0.3
certificate-generator:
job_class_name: 'org.sunbird.job.task.CertificateGeneratorStreamTask'
replica: 1
jobmanager_memory: 1024m
taskmanager_memory: 1024m
taskslots: 1
cpu_requests: 0.3
post-certificate-processor:
job_class_name: 'org.sunbird.job.task.PostCertificateProcessorStreamTask'
replica: 1
jobmanager_memory: 1024m
taskmanager_memory: 1024m
taskslots: 1
cpu_requests: 0.3
certificate-pre-processor:
job_class_name: 'org.sunbird.job.task.CertificatePreProcessorStreamTask'
questionset-publish:
job_class_name: 'org.sunbird.job.task.QuestionSetPublishStreamTask'
replica: 1
jobmanager_memory: 1024m
taskmanager_memory: 1024m
Expand Down
109 changes: 17 additions & 92 deletions kubernetes/helm_charts/datapipeline_jobs/values.j2
Original file line number Diff line number Diff line change
Expand Up @@ -192,108 +192,33 @@ post-publish-processor:
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1

certificate-generator:
certificate-generator: |+
questionset-publish:
questionset-publish: |+
include file("/data/flink/conf/base-config.conf")
kafka {
input.topic = {{ env_name }}.generate.certificate.request
output.failed.topic = {{ env_name }}.generate.certificate.failed
output.post.certificate.processor.topic = {{ env_name }}.post.certificate.process.request
groupId = {{ env_name }}-generator-certificate-group
input.topic = {{ env_name }}.assessment.publish.request
post_publish.topic = {{ env_name }}.content.postpublish.request
groupId = {{ env_name }}-questionset-publish-group
}
task {
consumer.parallelism = {{ certificate_generator_consumer_parallelism }}
parallelism = {{ certificate_generator_parallelism }}
}
cert-reg {
basePath = "{{ cert_reg_service_base_url }}"
}
enc-service {
basePath = "{{ enc_service_base_url }}"
}
cert_domain_url = "{{ cert_domain_url }}"
cert_container_name = "{{ cert_container_name }}"
cert_cloud_storage_type = "{{ cert_cloud_storage_type }}"
cert_azure_storage_secret = "{{ cert_azure_storage_secret }}"
cert_azure_storage_key = "{{ cert_azure_storage_key }}"

flink-conf: |+
jobmanager.memory.flink.size: {{ flink_job_names['certificate-generator'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['certificate-generator'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['certificate-generator'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1

post-certificate-processor:
post-certificate-processor: |+
include file("/data/flink/conf/base-config.conf")
kafka {
input.topic = {{ env_name }}.post.certificate.process.request
output.audit.topic = {{ env_name }}.telemetry.raw
output.failed.topic = {{ env_name }}.post.certificate.process.failed
groupId = {{ env_name }}-post-certificate-process-group
consumer.parallelism = 1
parallelism = 1
router.parallelism = 1
}
task {
consumer.parallelism = {{ post_certificate_processor_consumer_parallelism }}
parallelism = {{ post_certificate_processor_parallelism }}
question {
keyspace = "{{ assessment_keyspace_name }}"
table = "question_data"
}
lms-cassandra {
keyspace = "{{ middleware_course_keyspace }}"
enrollment.table = "{{ middleware_user_enrollment_table }}"
course_batch.table = "{{ middleware_course_batch_table }}"
}
learner-service {
basePath = "{{ learner_service_base_url }}"
questionset {
keyspace = "{{ hierarchy_keyspace_name }}"
table = "questionset_hierarchy"
}

flink-conf: |+
jobmanager.memory.flink.size: {{ flink_job_names['post-publish-processor'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['post-publish-processor'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['post-publish-processor'].taskslots }}
jobmanager.memory.flink.size: {{ flink_job_names['questionset-publish'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['questionset-publish'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['questionset-publish'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1

certificate-pre-processor:
certificate-pre-processor: |+
include file("/data/flink/conf/base-config.conf")
kafka {
input.topic = {{ env_name }}.issue.certificate.request
groupId = {{ env_name }}-certificate-pre-processor-group
output.topic = {{ env_name }}.generate.certificate.request
output.failed.topic = {{ env_name }}.certificate.events.failed
}
task {
consumer.parallelism = {{ certificate_pre_processor_consumer_parallelism }}
parallelism = {{ certificate_pre_processor_parallelism }}
}
lms-cassandra {
keyspace = "{{ middleware_course_keyspace }}"
batchTable = "course_batch"
userTable = "user_enrolments"
assessmentAggregatorTable = "assessment_aggregator"
}
content {
search {
basePath = "{{ kp_search_service_base_url }}"
}
basePath = "{{ kp_content_service_base_url }}"
}
lms {
basePath = "{{ lms_service_base_url }}"
}
cert {
basePath = "{{ certificate_base_path }}"
}
learner-service {
basePath = "{{ learner_service_base_url }}"
}

flink-conf: |+
jobmanager.memory.flink.size: {{ flink_job_names['certificate-pre-processor'].jobmanager_memory }}
taskmanager.memory.flink.size: {{ flink_job_names['certificate-pre-processor'].taskmanager_memory }}
taskmanager.numberOfTaskSlots: {{ flink_job_names['certificate-pre-processor'].taskslots }}
parallelism.default: 1
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.fraction: 0.1
2 changes: 0 additions & 2 deletions platform-core/unit-tests/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ content.metadata.visibility.parent=["textbookunit", "courseunit", "lessonplanuni

# Cassandra Configuration
content.keyspace.name=content_store_test
orchestrator.keyspace.name="script_store"
orchestrator.keyspace.table="script_data"
hierarchy.keyspace.name=hierarchy_store_test
content.hierarchy.table=content_hierarchy_test
framework.hierarchy.table=framework_hierarchy_test
Expand Down
2 changes: 0 additions & 2 deletions platform-modules/service/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ content.metadata.visibility.parent=["textbookunit", "courseunit", "lessonplanuni

# Cassandra Configuration
content.keyspace.name=content_store
orchestrator.keyspace.name=script_store
orchestrator.keyspace.table=script_data
cassandra.lp.connection="127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042"
cassandra.lpa.connection="127.0.0.1:9042,127.0.0.2:9042,127.0.0.3:9042"

Expand Down

0 comments on commit 6f75034

Please sign in to comment.