aboutsummaryrefslogtreecommitdiff
path: root/src/perform_queries.py
blob: 4e22f1b8bc1bf5ff02a26d792d4bf888a1903f7f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/bin/python3

from sys import argv
from threading import Thread
from time import sleep
import unbound
import psycopg2

# our own module used by several scripts in the project
from ztdnslib import start_db_connection, get_ztdns_config, log, set_loghour

class dns_queries:
    def __init__(self, dns_IP, dns_id, services):
        self.dns_IP = dns_IP
        self.dns_id = dns_id
        self.services = services

class single_query:
    def __init__(self, hour, cursor, vpn_id, dns_id, service_id):
        self.hour = hour
        self.cursor = cursor
        self.vpn_id = vpn_id
        self.dns_id = dns_id
        self.service_id = service_id

def query_planned_queries(cursor, hour, vpn_id):
    # return [
    #     #           dns server IP   | dns server id | service_id | service_name
    #     dns_queries("195.98.79.117",  23,           [[89,          "devuan.org"],
    #                                                  [44,          "gry.pl"],
    #                                                  [112,         "invidio.us"]]),
    #     dns_queries("192.71.245.208", 33,           [[77,          "debian.org"],
    #                                                  [22,          "nie.ma.takiej.domeny"],
    #                                                  [100,         "onet.pl"]])
    # ]
    cursor.execute('''
    SELECT DISTINCT d."IP", d.id
    FROM user_side_queries AS q JOIN user_side_dns AS d
    ON d.id = q.dns_id
    WHERE q.vpn_id = %s
    ''', (vpn_id,))
    dnss = cursor.fetchall()

    dnss_to_query = []
    
    for dns_IP, dns_id in dnss:
        cursor.execute('''
        SELECT s.id, s.name
        FROM user_side_service AS s JOIN user_side_queries AS q
        ON s.id = q.service_id
        WHERE q.vpn_id = %s AND q.dns_id = %s
        ''', (vpn_id, dns_id))
        
        queries = dns_queries(dns_IP, dns_id, cursor.fetchall())
        
        dnss_to_query.append(queries)

    return dnss_to_query

def resolve_call_back(mydata, status, result):
    global dups

    query = mydata
    # debugging
    print("callback called for {}".format(result.qname))
    if status != 0:
        result_info = 'internal failure: out of memory'
    elif result.rcode == 0:
        result_info = 'successful'
        print("Result:",result.data.address_list)
    elif result.rcode == 2:
        result_info = 'no response'
    elif result.rcode == 3:
        result_info = 'not exists'
    else:
        result_info = 'DNS error: {}'.format(result.rcode_str)

    # write to database
    try:
        query.cursor.connection.autocommit = False
        query.cursor.execute('''
        INSERT INTO user_side_responses
            (date, result, dns_id, service_id, vpn_id)
        VALUES (%s, %s, %s, %s, %s)
        RETURNING id
        ''', (query.hour, result_info, query.dns_id,
              query.service_id, query.vpn_id))

        responses_id = query.cursor.fetchone()[0]

        if status == 0:
            # an even better solution would be to have a trigger delete
            # the record when validity reaches 0
            query.cursor.execute('''
            UPDATE user_side_queries
            SET validity = validity - 1
            WHERE dns_id = %s AND service_id = %s AND vpn_id = %s;

            DELETE FROM user_side_queries
            WHERE dns_id = %s AND service_id = %s AND vpn_id = %s AND
                  validity < 1;
            ''', (query.dns_id, query.service_id, query.vpn_id,
                  query.dns_id, query.service_id, query.vpn_id))

        query.cursor.connection.commit()
        query.cursor.connection.autocommit = True
        
        if status == 0 and result.havedata:
            for address in result.data.address_list:
                query.cursor.execute('''
                INSERT INTO user_side_response (returned_ip, responses_id)
                VALUES(%s, %s)
                ''', (address, responses_id))

    except psycopg2.IntegrityError:
        query.cursor.connection.rollback()
        # Unique constraint is stopping us from adding duplicates;
        # This is most likey because back-end has been run multiple times
        # during the same hour (bad configuration or admin running manually
        # after cron), we'll write to logs about that.
        dups = True

dups = False
hour = argv[1]
set_loghour(hour) # log() function will now prepend messages with hour
vpn_id = argv[2]
config = get_ztdns_config()

def query_dns(dns_queries):
    connection = start_db_connection(config)
    cursor = connection.cursor()
    ctx = unbound.ub_ctx()
    ctx.set_fwd(dns_queries.dns_IP)
    
    first = True
    for service_id, service_name in dns_queries.services:
        if first:
            first = False
        else:
            sleep(0.4) # throttle between queries

        print("starting resolution of {} through {}".format(service_name,
                                                            dns_queries.dns_IP))
        query = single_query(hour, cursor, vpn_id,
                             dns_queries.dns_id, service_id)

        ctx.resolve_async(service_name, query, resolve_call_back,
                          unbound.RR_TYPE_A, unbound.RR_CLASS_IN)

    ctx.wait()
    cursor.close()
    connection.close()

connection = start_db_connection(config)
cursor = connection.cursor()
planned_queries = query_planned_queries(cursor, hour, vpn_id)
# each thread will make its own connection
cursor.close()
connection.close()
    
threads = []
for dns_queries in planned_queries:
    thread = Thread(target = query_dns, args = (dns_queries,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

if dups:
    log('results already exist for vpn {}'.format(vpn_id))