aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWojciech Kosior <kwojtus@protonmail.com>2020-06-15 21:38:50 +0200
committerWojciech Kosior <kwojtus@protonmail.com>2020-06-15 21:38:50 +0200
commit947b2e7d138e5daee56d8198b36b21672f9a517d (patch)
treeae5216313ecc3716287b9ad85cd21d997363f01d
parent41693328880905b719f458c37b6c8c7bdc9ab2ac (diff)
download0tdns-947b2e7d138e5daee56d8198b36b21672f9a517d.tar.gz
0tdns-947b2e7d138e5daee56d8198b36b21672f9a517d.zip
use separate connection for each thread
-rwxr-xr-xsrc/perform_queries.py64
1 files changed, 44 insertions, 20 deletions
diff --git a/src/perform_queries.py b/src/perform_queries.py
index 688f4c5..fef1300 100755
--- a/src/perform_queries.py
+++ b/src/perform_queries.py
@@ -4,6 +4,7 @@ from sys import argv
from threading import Thread
from time import sleep
import unbound
+import psycopg2
# our own module used by several scripts in the project
from ztdns_db_connectivity import start_db_connection, get_ztdns_config
@@ -66,46 +67,69 @@ def resolve_call_back(mydata, status, result):
else:
result_info = 'not found'
# write to database
- query.cursor.execute('''
- INSERT INTO user_side_responses (date, result, dns_id, service_id, vpn_id)
- VALUES (%s, %s, %s, %s, %s)
- RETURNING id
- ''', (query.hour, result_info, query.dns_id,
- query.service_id, query.vpn_id))
-
- responses_id = query.cursor.fetchone()[0]
-
- if status==0 and result.havedata:
- for address in result.data.address_list:
- query.cursor.execute('''
- INSERT INTO user_side_response (returned_ip, responses_id)
- VALUES(%s, %s)
- ''', (address, responses_id))
+ try:
+ query.cursor.execute('''
+ INSERT INTO user_side_responses
+ (date, result, dns_id, service_id, vpn_id)
+ VALUES (%s, %s, %s, %s, %s)
+ RETURNING id
+ ''', (query.hour, result_info, query.dns_id,
+ query.service_id, query.vpn_id))
+
+ responses_id = query.cursor.fetchone()[0]
+
+ if status==0 and result.havedata:
+ for address in result.data.address_list:
+ query.cursor.execute('''
+ INSERT INTO user_side_response (returned_ip, responses_id)
+ VALUES(%s, %s)
+ ''', (address, responses_id))
+ except psycopg2.IntegrityError:
+ # Unique constraint is stopping us from adding duplicates;
+ # This is most likey because back-end has been run multiple times
+ # during the same hour (bad configuration or admin running manually
+ # after cron)
+ pass
# no committing, since auto-commit mode is set on the connection
hour = argv[1]
vpn_id = argv[2]
config = get_ztdns_config()
-connection = start_db_connection(config)
-cursor = connection.cursor()
def query_dns(dns_queries):
+ connection = start_db_connection(config)
+ cursor = connection.cursor()
ctx = unbound.ub_ctx()
ctx.set_fwd(dns_queries.dns_IP)
+
+ first = True
for service_id, service_name in dns_queries.services:
+ if first:
+ first = False
+ else:
+ sleep(0.4) # throttle between queries
+
print("starting resolution of {} through {}".format(service_name,
dns_queries.dns_IP))
query = single_query(hour, cursor, vpn_id,
dns_queries.dns_id, service_id)
-
+
ctx.resolve_async(service_name, query, resolve_call_back,
unbound.RR_TYPE_A, unbound.RR_CLASS_IN)
- sleep(0.4)
ctx.wait()
+ cursor.close()
+ connection.close()
+connection = start_db_connection(config)
+cursor = connection.cursor()
+planned_queries = query_planned_queries(cursor, hour, vpn_id)
+# each thread will make its own connection
+cursor.close()
+connection.close()
+
threads = []
-for dns_queries in query_planned_queries(cursor, hour, vpn_id):
+for dns_queries in planned_queries:
thread = Thread(target = query_dns, args = (dns_queries,))
thread.start()
threads.append(thread)