diff --git a/tests/foreman/api/test_ansible.py b/tests/foreman/api/test_ansible.py index 71d47b860d2..1d0c253b3b7 100644 --- a/tests/foreman/api/test_ansible.py +++ b/tests/foreman/api/test_ansible.py @@ -470,3 +470,65 @@ def test_positive_ansible_job_on_multiple_host( assert result.succeeded == 2 # SELECTED_ROLE working on rhel8/rhel9 clients assert result.failed == 1 # SELECTED_ROLE failing on rhel7 client assert result.status_label == 'failed' + + @pytest.mark.no_containers + @pytest.mark.rhel_ver_match('[^6]') + def test_positive_ansible_localhost_job_on_host( + self, target_sat, module_org, module_location, module_ak_with_synced_repo, rhel_contenthost + ): + """Test successful execution of Ansible Job with "hosts: localhost" on host. + + :id: c8dcdc54-cb98-4b24-bff9-049a6cc36acd + + :steps: + 1. Register a content host with satellite + 2. Run the Ansible playbook with "hosts: localhost" on registered host + 3. Check if the job is executed and verify job output + + :expectedresults: + 1. Ansible playbook with "hosts: localhost" job execution must be successful. + + :BZ: 1982698 + + :customerscenario: true + """ + playbook = ''' + --- + - name: Simple Ansible Playbook for Localhost + hosts: localhost + gather_facts: no + tasks: + - name: Print a message + debug: + msg: "Hello, localhost!" + ''' + result = rhel_contenthost.register( + module_org, module_location, module_ak_with_synced_repo.name, target_sat + ) + assert result.status == 0, f'Failed to register host: {result.stderr}' + + template_id = ( + target_sat.api.JobTemplate() + .search(query={'search': 'name="Ansible - Run playbook"'})[0] + .id + ) + job = target_sat.api.JobInvocation().run( + synchronous=False, + data={ + 'job_template_id': template_id, + 'targeting_type': 'static_query', + 'search_query': f'name = {rhel_contenthost.hostname}', + 'inputs': {'playbook': playbook}, + }, + ) + target_sat.wait_for_tasks( + f'resource_type = JobInvocation and resource_id = {job["id"]}', poll_timeout=1000 + ) + result = target_sat.api.JobInvocation(id=job['id']).read() + assert result.pending == 0 + assert result.succeeded == 1 + assert result.status_label == 'succeeded' + + result = target_sat.api.JobInvocation(id=job['id']).outputs()['outputs'][0]['output'] + assert [i['output'] for i in result if '"msg": "Hello, localhost!"' in i['output']] + assert [i['output'] for i in result if i['output'] == 'Exit status: 0']