Skip to main content

Реагирование на BIFIT Mitigator

Данный скрипт добавлен пользователем KUMA Community и предоставляется AS IS без каких-либо гарантий и ответственности.

Описание

Скрипт позволяет осуществить временную блокировку трафика (tbl) в политике, которая подпадает под указанные параметры (src_ip, dst_ip, src_port, dst_port, protocol) или же в указанной политике.  


Аргументы

--server указываем ip адрес сервера, можно захаркодить в скрипте в hostnames = ['x.x.x.x', 'y.y.y.y'] # Вставить список хостов 
--user указываем имя пользователя, можно захаркодить в скрипте в user = 'login' # Заменить логин
--password указываем имя пользователя, можно захаркодить в скрипте в passwd = 'passwd' # Заменить пароль
--policy указываем policy_id, если известно заранее
-s, --ip_src IP адрес источника для блокировки
-d, --ip_dst IP адрес назначения для блокировки
-t, --time время на которое будет заблокирован трафик
-p, --port_src Порт источника для блокировки
-o, --port_dst Порт назначения для блокировки
-P, --protocol Протокол для блокировки


Правило реагирования

image.png


Скрипт

Скрипт на языке python доступен под спойлером

mitigator_block.py
#!/bin/python3

import argparse
import json
import ssl 
import sys


if sys.version_info >= (3, ):
        from urllib.request import Request, urlopen
        from urllib.error import HTTPError
else:
        from urllib2 import Request, urlopen, HTTPError

class RequestEx(Request):
    def __init__(self, *args, **kwargs):
        self._method = kwargs.pop('method', None)
        Request.__init__(self, *args, **kwargs)

    # 'add_data(str)' removed in Python 3.4 in favour of 'Request.data: bytes'
    def add_data(self, data):
        if hasattr(self, 'data'):
            self.data = data.encode('utf-8')
        else:
            Request.add_data(self, data)

    # Request.method was added in Python 3.3
    def get_method(self):
        return self._method if self._method else Request.get_method(self)
    
class MitigatorException(BaseException):
    def __init__(self, message):
        self.message = message

    def __str__(self):
        return self.message
    
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--server', help='Mitigator host')
    parser.add_argument('--user', help='Mitigator login')
    parser.add_argument('--password', help='Mitigator password')
    parser.add_argument('--policy', help='policy ID (as shown in URL)')
    parser.add_argument('--no-verify', help='disable TLS certificate validation', action='store_true')
    parser.add_argument('-s', '--ip_src',  required=True, help='IP address to block')
    parser.add_argument('-d', '--ip_dst', required=True, help='IP address to block')
    parser.add_argument('-t', '--time', required=True, help='block time in seconds', type=int)
    parser.add_argument('-p', '--port_src',  help='Port src to block', type=int)
    parser.add_argument('-o', '--port_dst',  help='Port dst to block', type=int)
    parser.add_argument('-P', '--protocol',  help='Protocol to block')
    return parser.parse_args()

def make_request(hostname, uri, method=None, token=None, policy=None, data=None, parameters=None):

    url = f'https://{hostname}/api/v4/{uri}'

    if parameters is not None:
        url = f'https://{hostname}/api/v4/{uri}?{parameters}'

    request = RequestEx(url, method=method)
    if token is not None:
        request.add_header('X-Auth-Token', token)
    if data is not None:
        request.add_data(json.dumps(data))
   
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
    ctx = ctx
    try:
        response = urlopen(request, context=ctx)
    except HTTPError as e:
        try:
            raise MitigatorException(e.fp.read())
        except IOError:
            raise e
    return json.load(response)['data']


def block_traffic(hostname, token, options, policy_id):
#Вносим во временную блокировку ip адрес
    if policy_id:
        tbl_install = make_request(hostname, 
                                f'tbl/items?policy={policy_id}&no_logs=false&source=1', 
                                token=token, 
                                method='POST', 
                                data={
                                        "items": [
                                            {
                                                "prefix": options.ip_src,
                                                "ttl": options.time
                                            }
                                        ]
                                    } )
        tbl_install_check = make_request(hostname, f'tbl/list?policy={policy_id}', token=token, method='GET')


def search_policy(hostname, token, options):
    data_request = ''
    if options.ip_src is not None:
        data_request += f'src_address={options.ip_src}'
    if options.ip_dst is not None:
        data_request += f'&dst_address={options.ip_dst}'
    if options.port_src is not None:
        data_request += f'&src_port={options.port_src}'
    else:
        data_request += '&src_port=88' # обязательный параметр при запросе policy_id, но в политике может быть не настроен, подставляем произвольный
    if options.port_dst is not None:
        data_request += f'&dst_port={options.port_dst}'
    else:
        data_request += '&dst_port=88' # обязательный параметр при запросе policy_id, но в политике может быть не настроен, подставляем произвольный
    if options.protocol is not None:
        data_request += f'&protocol={options.protocol}' 
    else:
        data_request += '&protocol=TCP' # обязательный параметр при запросе policy_id, но в политике может быть не настроен, подставляем произвольный
    policy_request = make_request(hostname, f'policies/by_inbound_packet', token=token, method='GET', parameters=data_request)
    policy_id = policy_request.get('policy_id')
    return policy_id  

if __name__ == '__main__':
    options = parse_args()
    if options.server is not None:
        hostnames = [options.server]
    else:
        hostnames = ['x.x.x.x', 'y.y.y.y'] # Заменить список хостов 
    if options.user is not None:
        user = options.user
    else:
        user = 'user' # Заменить логин
    if options.password is not None:
        passwd = options.password
    else:
        passwd = 'passwd' # Заменить пароль
    
    for host in hostnames:
        url_prefix = f"https://{host}/api/v4"
        try:
            data = make_request(host, 'users/session', data={'username': user, 'password': passwd})
            token = data['token']
            if options.policy is None:
                policy_id = search_policy(host, token, options)
                block_traffic(host, token, options, policy_id)
            else:
                block_traffic(host, token, options, options.policy)
        except:
            continue