aboutsummaryrefslogtreecommitdiff
#!/bin/python3

from sys import argv
from threading import Thread
from time import sleep
import unbound
import psycopg2

# our own module used by several scripts in the project
from ztdnslib import start_db_connection, get_ztdns_config, log, set_loghour

class dns_queries:
    def __init__(self, dns_IP, dns_id, services):
        self.dns_IP = dns_IP
        self.dns_id = dns_id
        self.services = services

class single_query:
    def __init__(self, hour, cursor, vpn_id, dns_id, service_id):
        self.hour = hour
        self.cursor = cursor
        self.vpn_id = vpn_id
        self.dns_id = dns_id
        self.service_id = service_id

def query_planned_queries(cursor, hour, vpn_id):
    # return [
    #     #           dns server IP   | dns server id | service_id | service_name
    #     dns_queries("195.98.79.117",  23,           [[89,          "devuan.org"],
    #                                                  [44,          "gry.pl"],
    #                                                  [112,         "invidio.us"]]),
    #     dns_queries("192.71.245.208", 33,           [[77,          "debian.org"],
    #                                                  [22,          "nie.ma.takiej.domeny"],
    #                                                  [100,         "onet.pl"]])
    # ]
    cursor.execute('''
    SELECT DISTINCT d."IP", d.id
    FROM user_side_queries AS q JOIN user_side_dns AS d
    ON d.id = q.dns_id
    WHERE q.vpn_id = %s
    ''', (vpn_id,))
    dnss = cursor.fetchall()

    dnss_to_query = []
    
    for dns_IP, dns_id in dnss:
        cursor.execute('''
        SELECT s.id, s.web_address
        FROM user_side_service AS s JOIN user_side_queries AS q
        ON s.id = q.service_id
        WHERE q.vpn_id = %s AND q.dns_id = %s
        ''', (vpn_id, dns_id))
        
        queries = dns_queries(dns_IP, dns_id, cursor.fetchall())
        
        dnss_to_query.append(queries)

    return dnss_to_query

def resolve_call_back(mydata, status, result):
    global dups

    query = mydata
    # debugging
    print("callback called for {}".format(result.qname))
    if status != 0:
        result_info = 'internal failure: out of memory'
    elif result.rcode == 0:
        result_info = 'successful'
        print("Result:",result.data.address_list)
    elif result.rcode == 2:
        result_info = 'no response'
    elif result.rcode == 3:
        result_info = 'not exists'
    else:
        result_info = 'DNS error: {}'.format(result.rcode_str)

    # write to database
    try:
        query.cursor.connection.autocommit = False
        query.cursor.execute('''
        INSERT INTO user_side_responses
            (date, result, dns_id, service_id, vpn_id)
        VALUES (%s, %s, %s, %s, %s)
        RETURNING id
        ''', (query.hour, result_info, query.dns_id,
              query.service_id, query.vpn_id))

        responses_id = query.cursor.fetchone()[0]

        if status == 0:
            # an even better solution would be to have a trigger delete
            # the record when validity reaches 0
            query.cursor.execute('''
            UPDATE user_side_queries
            SET validity = validity - 1
            WHERE dns_id = %s AND service_id = %s AND vpn_id = %s;

            DELETE FROM user_side_queries
            WHERE dns_id = %s AND service_id = %s AND vpn_id = %s AND
                  validity < 1;
            ''', (query.dns_id, query.service_id, query.vpn_id,
                  query.dns_id, query.service_id, query.vpn_id))

        query.cursor.connection.commit()
        query.cursor.connection.autocommit = True
        
        if status == 0 and result.havedata:
            for address in result.data.address_list:
                query.cursor.execute('''
                INSERT INTO user_side_response (returned_ip, responses_id)
                VALUES(%s, %s)
                ''', (address, responses_id))

    except psycopg2.IntegrityError:
        query.cursor.connection.rollback()
        # Unique constraint is stopping us from adding duplicates;
        # This is most likey because back-end has been run multiple times
        # during the same hour (bad configuration or admin running manually
        # after cron), we'll write to logs about that.
        dups = True

dups = False
hour = argv[1]
set_loghour(hour) # log() function will now prepend messages with hour
vpn_id = argv[2]
config = get_ztdns_config()

def query_dns(dns_queries):
    connection = start_db_connection(config)
    cursor = connection.cursor()
    ctx = unbound.ub_ctx()
    ctx.set_fwd(dns_queries.dns_IP)
    
    first = True
    for service_id, service_name in dns_queries.services:
        if first:
            first = False
        else:
            sleep(0.4) # throttle between queries

        print("starting resolution of {} through {}".format(service_name,
                                                            dns_queries.dns_IP))
        query = single_query(hour, cursor, vpn_id,
                             dns_queries.dns_id, service_id)

        ctx.resolve_async(service_name, query, resolve_call_back,
                          unbound.RR_TYPE_A, unbound.RR_CLASS_IN)

    ctx.wait()
    cursor.close()
    connection.close()

connection = start_db_connection(config)
cursor = connection.cursor()
planned_queries = query_planned_queries(cursor, hour, vpn_id)
# each thread will make its own connection
cursor.close()
connection.close()
    
threads = []
for dns_queries in planned_queries:
    thread = Thread(target = query_dns, args = (dns_queries,))
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()

if dups:
    log('results already exist for vpn {}'.format(vpn_id))