diff --git a/push-multiple-to-influx.py b/push-multiple-to-influx.py index c62bf37c4cee96216917a254a59242e13ee3ae41..1115e2e7552a66a0ef212f3f41475102ee05d052 100755 --- a/push-multiple-to-influx.py +++ b/push-multiple-to-influx.py @@ -2,8 +2,20 @@ # -*- coding: utf-8 -*- import os import subprocess +import multiprocessing +import sys +import time import yaml + +def work(cmd): + print '[{}] Calling {}'.format(multiprocessing.current_process(), ' '.join(cmd)) + subprocess.call(cmd) + sys.stdout.flush() + time.sleep(1) + return cmd + + if __name__ == '__main__': with open('push-multiple-to-influx.yml') as fdr: content = yaml.load(fdr) @@ -14,16 +26,28 @@ if __name__ == '__main__': sizes = content['sizes'] queries = content['queries'] + count = multiprocessing.cpu_count() + pool = multiprocessing.Pool(processes=count) + commands = [] + for solution_name in solutions: fileformat = solutions[solution_name]['fileformat'] for size in sizes: size = str(size) for query in queries: query = str(query) - filename = os.path.join(base_dir, solution_name, fileformat.replace(r'%size', size).replace(r'%query', query)) + basename = fileformat.replace(r'%size', size).replace(r'%query', query) + filename = os.path.join(base_dir, solution_name, basename) if not os.path.exists(filename): print 'File "{}" not found'.format(filename) continue - cmd = [python_executable, '-f', filename, '-s', size, '-q', query, '-n', solution_name] - print 'Calling ' + ' '.join(cmd) - subprocess.call(cmd) + cmd = [python_executable, '-f', filename, + '-s', size, + '-q', query, + '-n', solution_name] + commands.append(cmd) + print len(commands) + results = [] + r = pool.map_async(work, commands, callback=results.append) + r.wait() + print len(results) diff --git a/push-multiple-to-influx.yml b/push-multiple-to-influx.yml index c0ad0f1732cd824394679b213ccc9bc9b78bd2c7..f60ec11a1210f34196fccc80180cffca87a11854 100644 --- a/push-multiple-to-influx.yml +++ b/push-multiple-to-influx.yml @@ -5,9 +5,18 @@ solutions: fileformat: events-xml-%size-%query.csv jastadd-ttc18-xml-inc: fileformat: events-xml-%size-%query.csv + jastadd-ttc18-relast-xml-flush: + fileformat: events-xml-%size-%query.csv + jastadd-ttc18-relast-xml-inc: + fileformat: events-xml-%size-%query.csv sizes: - 1 - 2 + - 4 + - 8 + - 16 + - 32 + - 64 queries: - Q1 - Q2 diff --git a/solve/push-to-influx.py b/solve/push-to-influx.py index d510db63e29b1d133cd4831afdf5117847d4d566..c5beb4a3d971aade244f537da583b3b11fb06e42 100755 --- a/solve/push-to-influx.py +++ b/solve/push-to-influx.py @@ -5,7 +5,7 @@ import csv from datetime import datetime import re import os -import sys +import subprocess from influxdb import InfluxDBClient from influxdb import SeriesHelper @@ -43,7 +43,7 @@ class MySeriesHelper(SeriesHelper): # Defines the number of data points to store prior to writing # on the wire. - bulk_size = 10000 + bulk_size = 100000 # autocommit must be set to True when using bulk_size autocommit = True @@ -55,6 +55,14 @@ def nice_tag(s): return s +def wccount(filename): + out = subprocess.Popen(['wc', '-l', filename], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ).communicate()[0] + return int(out.partition(b' ')[0]) + + def main(args): if args.drop_database: myclient.drop_database(dbname) @@ -93,7 +101,8 @@ if __name__ == '__main__': parser.add_argument("-s", "--size", help="Size of change set", required=True) parser.add_argument("-q", "--query", help="Computed query", required=True) parser.add_argument("-n", "--name", help="Name of the solution", default='jastadd-ttc18') - parser.add_argument("--drop_database", help="Whether the database should be dropped beforehand (Default: false)", action='store_true') + parser.add_argument("--drop_database", action='store_true', + help="Whether the database should be dropped beforehand (Default: false)") args = parser.parse_args() main(args)