From 947b2e7d138e5daee56d8198b36b21672f9a517d Mon Sep 17 00:00:00 2001 From: Wojciech Kosior Date: Mon, 15 Jun 2020 21:38:50 +0200 Subject: use separate connection for each thread --- src/perform_queries.py | 64 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/src/perform_queries.py b/src/perform_queries.py index 688f4c5..fef1300 100755 --- a/src/perform_queries.py +++ b/src/perform_queries.py @@ -4,6 +4,7 @@ from sys import argv from threading import Thread from time import sleep import unbound +import psycopg2 # our own module used by several scripts in the project from ztdns_db_connectivity import start_db_connection, get_ztdns_config @@ -66,46 +67,69 @@ def resolve_call_back(mydata, status, result): else: result_info = 'not found' # write to database - query.cursor.execute(''' - INSERT INTO user_side_responses (date, result, dns_id, service_id, vpn_id) - VALUES (%s, %s, %s, %s, %s) - RETURNING id - ''', (query.hour, result_info, query.dns_id, - query.service_id, query.vpn_id)) - - responses_id = query.cursor.fetchone()[0] - - if status==0 and result.havedata: - for address in result.data.address_list: - query.cursor.execute(''' - INSERT INTO user_side_response (returned_ip, responses_id) - VALUES(%s, %s) - ''', (address, responses_id)) + try: + query.cursor.execute(''' + INSERT INTO user_side_responses + (date, result, dns_id, service_id, vpn_id) + VALUES (%s, %s, %s, %s, %s) + RETURNING id + ''', (query.hour, result_info, query.dns_id, + query.service_id, query.vpn_id)) + + responses_id = query.cursor.fetchone()[0] + + if status==0 and result.havedata: + for address in result.data.address_list: + query.cursor.execute(''' + INSERT INTO user_side_response (returned_ip, responses_id) + VALUES(%s, %s) + ''', (address, responses_id)) + except psycopg2.IntegrityError: + # Unique constraint is stopping us from adding duplicates; + # This is most likey because back-end has been run multiple times + # during the same hour (bad configuration or admin running manually + # after cron) + pass # no committing, since auto-commit mode is set on the connection hour = argv[1] vpn_id = argv[2] config = get_ztdns_config() -connection = start_db_connection(config) -cursor = connection.cursor() def query_dns(dns_queries): + connection = start_db_connection(config) + cursor = connection.cursor() ctx = unbound.ub_ctx() ctx.set_fwd(dns_queries.dns_IP) + + first = True for service_id, service_name in dns_queries.services: + if first: + first = False + else: + sleep(0.4) # throttle between queries + print("starting resolution of {} through {}".format(service_name, dns_queries.dns_IP)) query = single_query(hour, cursor, vpn_id, dns_queries.dns_id, service_id) - + ctx.resolve_async(service_name, query, resolve_call_back, unbound.RR_TYPE_A, unbound.RR_CLASS_IN) - sleep(0.4) ctx.wait() + cursor.close() + connection.close() +connection = start_db_connection(config) +cursor = connection.cursor() +planned_queries = query_planned_queries(cursor, hour, vpn_id) +# each thread will make its own connection +cursor.close() +connection.close() + threads = [] -for dns_queries in query_planned_queries(cursor, hour, vpn_id): +for dns_queries in planned_queries: thread = Thread(target = query_dns, args = (dns_queries,)) thread.start() threads.append(thread) -- cgit v1.2.3