Реагирование на BIFIT Mitigator

Данный скрипт добавлен пользователем KUMA Community и предоставляется AS IS без каких-либо гарантий и ответственности. 
 Описание 
 Скрипт позволяет осуществить временную блокировку трафика ( tbl ) в политике, которая подпадает под указанные параметры ( src_ip , dst_ip , src_port , dst_port , protocol ) или же в указанной политике.   
 
 Аргументы 
 --server указываем ip адрес сервера, можно захаркодить в скрипте в hostnames = ['x.x.x.x', 'y.y.y.y'] # Вставить список хостов  --user указываем имя пользователя, можно захаркодить в скрипте в user = 'login' # Заменить логин --password указываем имя пользователя, можно захаркодить в скрипте в passwd = 'passwd' # Заменить пароль --policy указываем policy_id, если известно заранее -s , --ip_src IP адрес источника для блокировки -d , --ip_dst IP адрес назначения для блокировки -t , --time время на которое будет заблокирован трафик -p , --port_src Порт источника для блокировки -o , --port_dst Порт назначения для блокировки -P , --protocol Протокол для блокировки 
 
 Правило реагирования 
 
 
 Скрипт 
 Скрипт на языке python доступен под спойлером 
 
 mitigator_block.py 
 #!/bin/python3

import argparse
import json
import ssl 
import sys

if sys.version_info >= (3, ):
 from urllib.request import Request, urlopen
 from urllib.error import HTTPError
else:
 from urllib2 import Request, urlopen, HTTPError

class RequestEx(Request):
 def __init__(self, *args, **kwargs):
 self._method = kwargs.pop('method', None)
 Request.__init__(self, *args, **kwargs)

 # 'add_data(str)' removed in Python 3.4 in favour of 'Request.data: bytes'
 def add_data(self, data):
 if hasattr(self, 'data'):
 self.data = data.encode('utf-8')
 else:
 Request.add_data(self, data)

 # Request.method was added in Python 3.3
 def get_method(self):
 return self._method if self._method else Request.get_method(self)
 
class MitigatorException(BaseException):
 def __init__(self, message):
 self.message = message

 def __str__(self):
 return self.message
 
def parse_args():
 parser = argparse.ArgumentParser()
 parser.add_argument('--server', help='Mitigator host')
 parser.add_argument('--user', help='Mitigator login')
 parser.add_argument('--password', help='Mitigator password')
 parser.add_argument('--policy', help='policy ID (as shown in URL)')
 parser.add_argument('--no-verify', help='disable TLS certificate validation', action='store_true')
 parser.add_argument('-s', '--ip_src', required=True, help='IP address to block')
 parser.add_argument('-d', '--ip_dst', required=True, help='IP address to block')
 parser.add_argument('-t', '--time', required=True, help='block time in seconds', type=int)
 parser.add_argument('-p', '--port_src', help='Port src to block', type=int)
 parser.add_argument('-o', '--port_dst', help='Port dst to block', type=int)
 parser.add_argument('-P', '--protocol', help='Protocol to block')
 return parser.parse_args()

def make_request(hostname, uri, method=None, token=None, policy=None, data=None, parameters=None):

 url = f'https://{hostname}/api/v4/{uri}'

 if parameters is not None:
 url = f'https://{hostname}/api/v4/{uri}?{parameters}'

 request = RequestEx(url, method=method)
 if token is not None:
 request.add_header('X-Auth-Token', token)
 if data is not None:
 request.add_data(json.dumps(data))
 
 ctx = ssl.create_default_context()
 ctx.check_hostname = False
 ctx.verify_mode = ssl.CERT_NONE
 ctx = ctx
 try:
 response = urlopen(request, context=ctx)
 except HTTPError as e:
 try:
 raise MitigatorException(e.fp.read())
 except IOError:
 raise e
 return json.load(response)['data']

def block_traffic(hostname, token, options, policy_id):
#Вносим во временную блокировку ip адрес
 if policy_id:
 tbl_install = make_request(hostname, 
 f'tbl/items?policy={policy_id}&no_logs=false&source=1', 
 token=token, 
 method='POST', 
 data={
 "items": [
 {
 "prefix": options.ip_src,
 "ttl": options.time
 }
 ]
 } )
 tbl_install_check = make_request(hostname, f'tbl/list?policy={policy_id}', token=token, method='GET')

def search_policy(hostname, token, options):
 data_request = ''
 if options.ip_src is not None:
 data_request += f'src_address={options.ip_src}'
 if options.ip_dst is not None:
 data_request += f'&dst_address={options.ip_dst}'
 if options.port_src is not None:
 data_request += f'&src_port={options.port_src}'
 else:
 data_request += '&src_port=88' # обязательный параметр при запросе policy_id, но в политике может быть не настроен, подставляем произвольный
 if options.port_dst is not None:
 data_request += f'&dst_port={options.port_dst}'
 else:
 data_request += '&dst_port=88' # обязательный параметр при запросе policy_id, но в политике может быть не настроен, подставляем произвольный
 if options.protocol is not None:
 data_request += f'&protocol={options.protocol}' 
 else:
 data_request += '&protocol=TCP' # обязательный параметр при запросе policy_id, но в политике может быть не настроен, подставляем произвольный
 policy_request = make_request(hostname, f'policies/by_inbound_packet', token=token, method='GET', parameters=data_request)
 policy_id = policy_request.get('policy_id')
 return policy_id 

if __name__ == '__main__':
 options = parse_args()
 if options.server is not None:
 hostnames = [options.server]
 else:
 hostnames = ['x.x.x.x', 'y.y.y.y'] # Заменить список хостов 
 if options.user is not None:
 user = options.user
 else:
 user = 'user' # Заменить логин
 if options.password is not None:
 passwd = options.password
 else:
 passwd = 'passwd' # Заменить пароль
 
 for host in hostnames:
 url_prefix = f"https://{host}/api/v4"
 try:
 data = make_request(host, 'users/session', data={'username': user, 'password': passwd})
 token = data['token']
 if options.policy is None:
 policy_id = search_policy(host, token, options)
 block_traffic(host, token, options, policy_id)
 else:
 block_traffic(host, token, options, options.policy)
 except:
 continue