Skip to content
Snippets Groups Projects
Commit e09b589c authored by René Schöne's avatar René Schöne
Browse files

Push to influx now in parallel.

parent 5750292e
Branches
No related tags found
No related merge requests found
......@@ -2,8 +2,20 @@
# -*- coding: utf-8 -*-
import os
import subprocess
import multiprocessing
import sys
import time
import yaml
def work(cmd):
print '[{}] Calling {}'.format(multiprocessing.current_process(), ' '.join(cmd))
subprocess.call(cmd)
sys.stdout.flush()
time.sleep(1)
return cmd
if __name__ == '__main__':
with open('push-multiple-to-influx.yml') as fdr:
content = yaml.load(fdr)
......@@ -14,16 +26,28 @@ if __name__ == '__main__':
sizes = content['sizes']
queries = content['queries']
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
commands = []
for solution_name in solutions:
fileformat = solutions[solution_name]['fileformat']
for size in sizes:
size = str(size)
for query in queries:
query = str(query)
filename = os.path.join(base_dir, solution_name, fileformat.replace(r'%size', size).replace(r'%query', query))
basename = fileformat.replace(r'%size', size).replace(r'%query', query)
filename = os.path.join(base_dir, solution_name, basename)
if not os.path.exists(filename):
print 'File "{}" not found'.format(filename)
continue
cmd = [python_executable, '-f', filename, '-s', size, '-q', query, '-n', solution_name]
print 'Calling ' + ' '.join(cmd)
subprocess.call(cmd)
cmd = [python_executable, '-f', filename,
'-s', size,
'-q', query,
'-n', solution_name]
commands.append(cmd)
print len(commands)
results = []
r = pool.map_async(work, commands, callback=results.append)
r.wait()
print len(results)
......@@ -5,9 +5,18 @@ solutions:
fileformat: events-xml-%size-%query.csv
jastadd-ttc18-xml-inc:
fileformat: events-xml-%size-%query.csv
jastadd-ttc18-relast-xml-flush:
fileformat: events-xml-%size-%query.csv
jastadd-ttc18-relast-xml-inc:
fileformat: events-xml-%size-%query.csv
sizes:
- 1
- 2
- 4
- 8
- 16
- 32
- 64
queries:
- Q1
- Q2
......@@ -5,7 +5,7 @@ import csv
from datetime import datetime
import re
import os
import sys
import subprocess
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
......@@ -43,7 +43,7 @@ class MySeriesHelper(SeriesHelper):
# Defines the number of data points to store prior to writing
# on the wire.
bulk_size = 10000
bulk_size = 100000
# autocommit must be set to True when using bulk_size
autocommit = True
......@@ -55,6 +55,14 @@ def nice_tag(s):
return s
def wccount(filename):
out = subprocess.Popen(['wc', '-l', filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
).communicate()[0]
return int(out.partition(b' ')[0])
def main(args):
if args.drop_database:
myclient.drop_database(dbname)
......@@ -93,7 +101,8 @@ if __name__ == '__main__':
parser.add_argument("-s", "--size", help="Size of change set", required=True)
parser.add_argument("-q", "--query", help="Computed query", required=True)
parser.add_argument("-n", "--name", help="Name of the solution", default='jastadd-ttc18')
parser.add_argument("--drop_database", help="Whether the database should be dropped beforehand (Default: false)", action='store_true')
parser.add_argument("--drop_database", action='store_true',
help="Whether the database should be dropped beforehand (Default: false)")
args = parser.parse_args()
main(args)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment