add the ability to load lots of jobs with firehose.py

$ awx-python tools/scripts/firehose.py --jobs 5000000 --events 100000000
This commit is contained in:
Ryan Petrello
2020-02-18 08:55:06 -05:00
parent f57fff732e
commit 4d06c812e6

View File

@@ -28,12 +28,21 @@ import argparse
import datetime import datetime
import json import json
import multiprocessing import multiprocessing
import pkg_resources
import subprocess import subprocess
import sys
from io import StringIO from io import StringIO
from time import time
from random import randint
from uuid import uuid4 from uuid import uuid4
import psycopg2 import psycopg2
from django import setup as setup_django
from django.db import connection
from django.db.models.sql import InsertQuery
from django.utils.timezone import now
db = json.loads( db = json.loads(
subprocess.check_output( subprocess.check_output(
['awx-manage', 'print_settings', 'DATABASES', '--format', 'json'] ['awx-manage', 'print_settings', 'DATABASES', '--format', 'json']
@@ -110,14 +119,39 @@ def cleanup(sql):
conn.close() conn.close()
if __name__ == '__main__': def generate_jobs(jobs):
parser = argparse.ArgumentParser() print(f'inserting {jobs} job(s)')
parser.add_argument('job') sys.path.insert(0, pkg_resources.get_distribution('awx').module_path)
parser.add_argument('--chunk', type=int, default=1000000000) # 1B by default from awx import prepare_env
params = parser.parse_args() prepare_env()
chunk = params.chunk setup_django()
print(datetime.datetime.utcnow().isoformat())
from awx.main.models import UnifiedJob, Job, JobTemplate
def make_batch(N):
jt = JobTemplate.objects.first()
jobs = [Job(job_template=jt, status='canceled', created=now(), modified=now(), elapsed=0.) for i in range(N)]
ujs = UnifiedJob.objects.bulk_create(jobs)
query = InsertQuery(Job)
fields = list(set(Job._meta.fields) - set(UnifiedJob._meta.fields))
query.insert_values(fields, ujs)
with connection.cursor() as cursor:
query, params = query.sql_with_params()[0]
cursor.execute(query, params)
return ujs[-1]
i = 1
while jobs > 0:
s = time()
print('running batch {}, runtime {}'.format(i, time() - s))
created = make_batch(min(jobs, 1000))
print('took {}'.format(time() - s))
i += 1
jobs -= 1000
return created
def generate_events(events, job):
conn = psycopg2.connect(dsn) conn = psycopg2.connect(dsn)
cursor = conn.cursor() cursor = conn.cursor()
print('removing indexes and constraints') print('removing indexes and constraints')
@@ -146,12 +180,12 @@ if __name__ == '__main__':
print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}') print(f'ALTER TABLE main_jobevent DROP CONSTRAINT IF EXISTS {conname}')
conn.commit() conn.commit()
print(f'inserting {chunk} events') print(f'attaching {events} events to job {job}')
cores = multiprocessing.cpu_count() cores = multiprocessing.cpu_count()
workers = [] workers = []
for i in range(cores): for i in range(cores):
p = multiprocessing.Process(target=firehose, args=(params.job, chunk / cores)) p = multiprocessing.Process(target=firehose, args=(job, events / cores))
p.daemon = True p.daemon = True
workers.append(p) workers.append(p)
@@ -203,3 +237,15 @@ if __name__ == '__main__':
cleanup(sql) cleanup(sql)
conn.close() conn.close()
print(datetime.datetime.utcnow().isoformat()) print(datetime.datetime.utcnow().isoformat())
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--jobs', type=int, default=1000000) # 1M by default
parser.add_argument('--events', type=int, default=1000000000) # 1B by default
params = parser.parse_args()
jobs = params.jobs
events = params.events
print(datetime.datetime.utcnow().isoformat())
created = generate_jobs(jobs)
generate_events(events, str(created.pk))