From 144cffe009d0be86caaddff5f8f6c1cbfd7e306d Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Fri, 6 Mar 2020 13:39:59 +0100 Subject: [PATCH 1/4] Send job and template nodes to analytics Sending tables main_workflowjobnode and main_workflowjobtemplatenode containing arrays of success/failure/always_nodes which is compatible to what API call for nodes return. --- awx/main/analytics/collectors.py | 64 ++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 852d6a71ec..438285caca 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -230,6 +230,8 @@ def query_info(since, collection_type): @table_version('events_table.csv', '1.1') @table_version('unified_jobs_table.csv', '1.0') @table_version('unified_job_template_table.csv', '1.0') +@table_version('workflow_job_node_table.csv', '1.0') +@table_version('workflow_job_template_node_table.csv', '1.0') def copy_tables(since, full_path): def _copy_table(table, query, path): file_path = os.path.join(path, table + '_table.csv') @@ -311,4 +313,66 @@ def copy_tables(since, full_path): WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) + + workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, + main_workflowjobnode.created, + main_workflowjobnode.modified, + main_workflowjobnode.job_id, + main_workflowjobnode.unified_job_template_id, + main_workflowjobnode.workflow_job_id, + main_workflowjobnode.inventory_id, + success_nodes.nodes AS success_nodes, + failure_nodes.nodes AS failure_nodes, + always_nodes.nodes AS always_nodes, + main_workflowjobnode.do_not_run, + main_workflowjobnode.all_parents_must_converge + FROM main_workflowjobnode + LEFT JOIN ( + SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes + FROM main_workflowjobnode_success_nodes + GROUP BY from_workflowjobnode_id + ) success_nodes ON main_workflowjobnode.id = success_nodes.from_workflowjobnode_id + LEFT JOIN ( + SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes + FROM main_workflowjobnode_failure_nodes + GROUP BY from_workflowjobnode_id + ) failure_nodes ON main_workflowjobnode.id = failure_nodes.from_workflowjobnode_id + LEFT JOIN ( + SELECT from_workflowjobnode_id, ARRAY_AGG(to_workflowjobnode_id) AS nodes + FROM main_workflowjobnode_always_nodes + GROUP BY from_workflowjobnode_id + ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id + WHERE main_workflowjobnode.modified > {} + ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) + + workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, + main_workflowjobtemplatenode.created, + main_workflowjobtemplatenode.modified, + main_workflowjobtemplatenode.unified_job_template_id, + main_workflowjobtemplatenode.workflow_job_template_id, + main_workflowjobtemplatenode.inventory_id, + success_nodes.nodes AS success_nodes, + failure_nodes.nodes AS failure_nodes, + always_nodes.nodes AS always_nodes, + main_workflowjobtemplatenode.all_parents_must_converge + FROM main_workflowjobtemplatenode + LEFT JOIN ( + SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes + FROM main_workflowjobtemplatenode_success_nodes + GROUP BY from_workflowjobtemplatenode_id + ) success_nodes ON main_workflowjobtemplatenode.id = success_nodes.from_workflowjobtemplatenode_id + LEFT JOIN ( + SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes + FROM main_workflowjobtemplatenode_failure_nodes + GROUP BY from_workflowjobtemplatenode_id + ) failure_nodes ON main_workflowjobtemplatenode.id = failure_nodes.from_workflowjobtemplatenode_id + LEFT JOIN ( + SELECT from_workflowjobtemplatenode_id, ARRAY_AGG(to_workflowjobtemplatenode_id) AS nodes + FROM main_workflowjobtemplatenode_always_nodes + GROUP BY from_workflowjobtemplatenode_id + ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id + ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) + _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) + return From 5b0bb4939f7ee65f40de2e75e11c470fd39c4479 Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Wed, 15 Apr 2020 12:26:57 -0400 Subject: [PATCH 2/4] Allow subsets of table gathering for unit tests. sqlite does not like some of our PG-isms. --- awx/main/analytics/collectors.py | 21 ++++++++++++------- .../functional/analytics/test_collectors.py | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/awx/main/analytics/collectors.py b/awx/main/analytics/collectors.py index 438285caca..331983e24a 100644 --- a/awx/main/analytics/collectors.py +++ b/awx/main/analytics/collectors.py @@ -232,7 +232,7 @@ def query_info(since, collection_type): @table_version('unified_job_template_table.csv', '1.0') @table_version('workflow_job_node_table.csv', '1.0') @table_version('workflow_job_template_node_table.csv', '1.0') -def copy_tables(since, full_path): +def copy_tables(since, full_path, subset=None): def _copy_table(table, query, path): file_path = os.path.join(path, table + '_table.csv') file = open(file_path, 'w', encoding='utf-8') @@ -264,7 +264,8 @@ def copy_tables(since, full_path): FROM main_jobevent WHERE main_jobevent.created > {} ORDER BY main_jobevent.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - _copy_table(table='events', query=events_query, path=full_path) + if not subset or 'events' in subset: + _copy_table(table='events', query=events_query, path=full_path) unified_job_query = '''COPY (SELECT main_unifiedjob.id, main_unifiedjob.polymorphic_ctype_id, @@ -292,7 +293,8 @@ def copy_tables(since, full_path): WHERE (main_unifiedjob.created > {0} OR main_unifiedjob.finished > {0}) AND main_unifiedjob.launch_type != 'sync' ORDER BY main_unifiedjob.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) + if not subset or 'unified_jobs' in subset: + _copy_table(table='unified_jobs', query=unified_job_query, path=full_path) unified_job_template_query = '''COPY (SELECT main_unifiedjobtemplate.id, main_unifiedjobtemplate.polymorphic_ctype_id, @@ -311,8 +313,9 @@ def copy_tables(since, full_path): main_unifiedjobtemplate.status FROM main_unifiedjobtemplate, django_content_type WHERE main_unifiedjobtemplate.polymorphic_ctype_id = django_content_type.id - ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' - _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) + ORDER BY main_unifiedjobtemplate.id ASC) TO STDOUT WITH CSV HEADER''' + if not subset or 'unified_job_template' in subset: + _copy_table(table='unified_job_template', query=unified_job_template_query, path=full_path) workflow_job_node_query = '''COPY (SELECT main_workflowjobnode.id, main_workflowjobnode.created, @@ -344,7 +347,8 @@ def copy_tables(since, full_path): ) always_nodes ON main_workflowjobnode.id = always_nodes.from_workflowjobnode_id WHERE main_workflowjobnode.modified > {} ORDER BY main_workflowjobnode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) + if not subset or 'workflow_job_node' in subset: + _copy_table(table='workflow_job_node', query=workflow_job_node_query, path=full_path) workflow_job_template_node_query = '''COPY (SELECT main_workflowjobtemplatenode.id, main_workflowjobtemplatenode.created, @@ -372,7 +376,8 @@ def copy_tables(since, full_path): FROM main_workflowjobtemplatenode_always_nodes GROUP BY from_workflowjobtemplatenode_id ) always_nodes ON main_workflowjobtemplatenode.id = always_nodes.from_workflowjobtemplatenode_id - ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER'''.format(since.strftime("'%Y-%m-%d %H:%M:%S'")) - _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) + ORDER BY main_workflowjobtemplatenode.id ASC) TO STDOUT WITH CSV HEADER''' + if not subset or 'workflow_job_template_node' in subset: + _copy_table(table='workflow_job_template_node', query=workflow_job_template_node_query, path=full_path) return diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index 21a243c907..73f03ba7b6 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -69,7 +69,7 @@ def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, j job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir) + collectors.copy_tables(time_start, tmpdir, subset='unified_jobs') with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f: lines = ''.join([l for l in f]) From 921feb561ddbaf434013593dc350eb3e92cbd4a1 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 16 Apr 2020 11:48:15 -0400 Subject: [PATCH 3/4] add test case for wfj nodes analytics --- .../functional/analytics/test_collectors.py | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index 73f03ba7b6..91aaf7ffbc 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -12,6 +12,9 @@ from awx.main.analytics import collectors from awx.main.models import ( ProjectUpdate, InventorySource, + WorkflowJob, + WorkflowJobNode, + JobTemplate, ) @@ -29,6 +32,8 @@ def sqlite_copy_expert(request): sql = sql.replace('COPY (', '') sql = sql.replace(') TO STDOUT WITH CSV HEADER', '') + # sqlite equivalent + sql = sql.replace('ARRAY_AGG', 'GROUP_CONCAT') # Remove JSON style queries # TODO: could replace JSON style queries with sqlite kind of equivalents @@ -76,3 +81,57 @@ def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, j assert project_update_name in lines assert inventory_update_name in lines assert job_name in lines + + +@pytest.fixture +def workflow_job(states=['new', 'new', 'new', 'new', 'new']): + """ + Workflow topology: + node[0] + /\ + s/ \f + / \ + node[1,5] node[3] + / \ + s/ \f + / \ + node[2] node[4] + """ + wfj = WorkflowJob.objects.create() + jt = JobTemplate.objects.create(name='test-jt') + nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)] + for node, state in zip(nodes, states): + if state: + node.job = jt.create_job() + node.job.status = state + node.job.save() + node.save() + nodes[0].success_nodes.add(nodes[1]) + nodes[0].success_nodes.add(nodes[5]) + nodes[1].success_nodes.add(nodes[2]) + nodes[0].failure_nodes.add(nodes[3]) + nodes[3].failure_nodes.add(nodes[4]) + return wfj + + +@pytest.mark.django_db +def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): + time_start = now() + + with tempfile.TemporaryDirectory() as tmpdir: + collectors.copy_tables(time_start, tmpdir, subset='workflow_job_node_query') + with open(os.path.join(tmpdir, 'workflow_job_node_table.csv')) as f: + reader = csv.reader(f) + # Pop the headers + next(reader) + lines = [l for l in reader] + + ids = [int(l[0]) for l in lines] + + assert ids == list(workflow_job.workflow_nodes.all().values_list('id', flat=True)) + + for index, relationship in zip([7, 8, 9], ['success_nodes', 'failure_nodes', 'always_nodes']): + for i, l in enumerate(lines): + related_nodes = [int(e) for e in l[index].split(',')] if l[index] else [] + assert related_nodes == list(getattr(workflow_job.workflow_nodes.all()[i], relationship).all().values_list('id', flat=True)), \ + f"(right side) workflow_nodes.all()[{i}].{relationship}.all()" From 58c821f3e10eb9c96505037e1d95782d11829b3b Mon Sep 17 00:00:00 2001 From: Bill Nottingham Date: Tue, 21 Apr 2020 16:25:49 -0400 Subject: [PATCH 4/4] De-flake the collector test. --- .../functional/analytics/test_collectors.py | 89 ++++++++++++------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/awx/main/tests/functional/analytics/test_collectors.py b/awx/main/tests/functional/analytics/test_collectors.py index 91aaf7ffbc..d1c426b8ec 100644 --- a/awx/main/tests/functional/analytics/test_collectors.py +++ b/awx/main/tests/functional/analytics/test_collectors.py @@ -22,61 +22,72 @@ from awx.main.models import ( def sqlite_copy_expert(request): # copy_expert is postgres-specific, and SQLite doesn't support it; mock its # behavior to test that it writes a file that contains stdout from events - path = tempfile.mkdtemp(prefix='copied_tables') + path = tempfile.mkdtemp(prefix="copied_tables") def write_stdout(self, sql, fd): # Would be cool if we instead properly disected the SQL query and verified # it that way. But instead, we just take the nieve approach here. - assert sql.startswith('COPY (') - assert sql.endswith(') TO STDOUT WITH CSV HEADER') + assert sql.startswith("COPY (") + assert sql.endswith(") TO STDOUT WITH CSV HEADER") - sql = sql.replace('COPY (', '') - sql = sql.replace(') TO STDOUT WITH CSV HEADER', '') + sql = sql.replace("COPY (", "") + sql = sql.replace(") TO STDOUT WITH CSV HEADER", "") # sqlite equivalent - sql = sql.replace('ARRAY_AGG', 'GROUP_CONCAT') + sql = sql.replace("ARRAY_AGG", "GROUP_CONCAT") # Remove JSON style queries # TODO: could replace JSON style queries with sqlite kind of equivalents sql_new = [] - for line in sql.split('\n'): - if line.find('main_jobevent.event_data::') == -1: + for line in sql.split("\n"): + if line.find("main_jobevent.event_data::") == -1: sql_new.append(line) - elif not line.endswith(','): - sql_new[-1] = sql_new[-1].rstrip(',') - sql = '\n'.join(sql_new) + elif not line.endswith(","): + sql_new[-1] = sql_new[-1].rstrip(",") + sql = "\n".join(sql_new) self.execute(sql) results = self.fetchall() headers = [i[0] for i in self.description] - csv_handle = csv.writer(fd, delimiter=',', quoting=csv.QUOTE_ALL, escapechar='\\', lineterminator='\n') + csv_handle = csv.writer( + fd, + delimiter=",", + quoting=csv.QUOTE_ALL, + escapechar="\\", + lineterminator="\n", + ) csv_handle.writerow(headers) csv_handle.writerows(results) - - setattr(SQLiteCursorWrapper, 'copy_expert', write_stdout) + setattr(SQLiteCursorWrapper, "copy_expert", write_stdout) request.addfinalizer(lambda: shutil.rmtree(path)) - request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, 'copy_expert')) + request.addfinalizer(lambda: delattr(SQLiteCursorWrapper, "copy_expert")) return path @pytest.mark.django_db -def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, job_template): - ''' +def test_copy_tables_unified_job_query( + sqlite_copy_expert, project, inventory, job_template +): + """ Ensure that various unified job types are in the output of the query. - ''' + """ time_start = now() - inv_src = InventorySource.objects.create(name="inventory_update1", inventory=inventory, source='gce') + inv_src = InventorySource.objects.create( + name="inventory_update1", inventory=inventory, source="gce" + ) - project_update_name = ProjectUpdate.objects.create(project=project, name="project_update1").name + project_update_name = ProjectUpdate.objects.create( + project=project, name="project_update1" + ).name inventory_update_name = inv_src.create_unified_job().name job_name = job_template.create_unified_job().name with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir, subset='unified_jobs') - with open(os.path.join(tmpdir, 'unified_jobs_table.csv')) as f: - lines = ''.join([l for l in f]) + collectors.copy_tables(time_start, tmpdir, subset="unified_jobs") + with open(os.path.join(tmpdir, "unified_jobs_table.csv")) as f: + lines = "".join([l for l in f]) assert project_update_name in lines assert inventory_update_name in lines @@ -84,7 +95,7 @@ def test_copy_tables_unified_job_query(sqlite_copy_expert, project, inventory, j @pytest.fixture -def workflow_job(states=['new', 'new', 'new', 'new', 'new']): +def workflow_job(states=["new", "new", "new", "new", "new"]): """ Workflow topology: node[0] @@ -98,8 +109,11 @@ def workflow_job(states=['new', 'new', 'new', 'new', 'new']): node[2] node[4] """ wfj = WorkflowJob.objects.create() - jt = JobTemplate.objects.create(name='test-jt') - nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)] + jt = JobTemplate.objects.create(name="test-jt") + nodes = [ + WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) + for i in range(0, 6) + ] for node, state in zip(nodes, states): if state: node.job = jt.create_job() @@ -119,8 +133,8 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): time_start = now() with tempfile.TemporaryDirectory() as tmpdir: - collectors.copy_tables(time_start, tmpdir, subset='workflow_job_node_query') - with open(os.path.join(tmpdir, 'workflow_job_node_table.csv')) as f: + collectors.copy_tables(time_start, tmpdir, subset="workflow_job_node_query") + with open(os.path.join(tmpdir, "workflow_job_node_table.csv")) as f: reader = csv.reader(f) # Pop the headers next(reader) @@ -128,10 +142,19 @@ def test_copy_tables_workflow_job_node_query(sqlite_copy_expert, workflow_job): ids = [int(l[0]) for l in lines] - assert ids == list(workflow_job.workflow_nodes.all().values_list('id', flat=True)) + assert ids == list( + workflow_job.workflow_nodes.all().values_list("id", flat=True) + ) - for index, relationship in zip([7, 8, 9], ['success_nodes', 'failure_nodes', 'always_nodes']): + for index, relationship in zip( + [7, 8, 9], ["success_nodes", "failure_nodes", "always_nodes"] + ): for i, l in enumerate(lines): - related_nodes = [int(e) for e in l[index].split(',')] if l[index] else [] - assert related_nodes == list(getattr(workflow_job.workflow_nodes.all()[i], relationship).all().values_list('id', flat=True)), \ - f"(right side) workflow_nodes.all()[{i}].{relationship}.all()" + related_nodes = ( + [int(e) for e in l[index].split(",")] if l[index] else [] + ) + assert related_nodes == list( + getattr(workflow_job.workflow_nodes.all()[i], relationship) + .all() + .values_list("id", flat=True) + ), f"(right side) workflow_nodes.all()[{i}].{relationship}.all()"