Интеграция KUMA с KSC
ОписаниеДля функций ClickHouse для работыинтеграции с запросами: https://clickhouse.com/docs/ru/sql-reference/functions/
Базовые запросы
Типы событийKSC в KUMA
необходимо заранее Подсчетсоздать событийучетную позапись полю
(внутреннего
KSC:SELECTпользователя) count(ID)в as count_num, DeviceVendor FROM `events` GROUP BY DeviceVendor ORDER BY count_num DESC LIMIT 250
Выполнение математических операций и сравнений
SELECT DeviceProduct, SourceUserName, round(sum(BytesIn)/1024, 2) as KiloBytes FROM `events` WHERE BytesIn > '0' OR BytesOut > '0' GROUP by SourceUserName, DeviceProduct ORDER BY KiloBytes DESC LIMIT 250
Поиск событий по подстроке (регистрозависимый) с условием И
SELECT * FROM `events` WHERE DeviceHostName like '%serv%' AND DeviceProduct = 'Windows' ORDER BY Timestamp DESC LIMIT 250
Поиск событий по подстроке (регистроНЕзависимый)
SELECT * FROM `events` WHERE DeviceEventCategory ilike '%auditing%' AND DeviceProduct = 'Windows' ORDER BY Timestamp DESC LIMIT 250
Поиск событий по нескольким значениям, вместо OR можно использовать IN
SELECT * FROM `events` WHERE DeviceEventClassID IN ('BROKER_USERLOGGEDOUT', 'BROKER_USERLOGGEDIN') AND DeviceProduct = 'Horizon' ORDER BY Timestamp DESC LIMIT 250
Поиск событий по подсети
SELECT * FROM `events` WHERE inSubnet(DeviceAddress, '10.68.85.70/32') ORDER BY Timestamp DESC LIMIT 250
Проверка работы обогащения
Обогащение событий Активами
SELECT * FROM `events` WHERE DeviceAssetID != '' OR SourceAssetID != '' OR DestinationAssetID != '' ORDER BY Timestamp DESC LIMIT 250
Обогащение событий с LDAP
SELECT * FROM `events` WHERE SourceAccountID != '' OR DestinationAccountID != '' ORDER BY Timestamp DESC LIMIT 250
Обогащение событий данными из TI
SELECT * FROM `events` where not TI="" ORDER BY Timestamp DESC LIMIT 250
Работа с Extra полем
Поиск событий по полю Extra содержащие ключ
SELECT * FROM `events` WHERE visitParamHas(Extra, 'memUsage') ORDER BY Timestamp DESC LIMIT 250
Поиск
Минимальные событийразрешения для интеграции с KSC такие:
- Access objects regardless of their ACLs — позволяет просто импортировать активы
- Management of administration groups — позволяет перемещать активы между группами в KSC из интерфейса KUMA
- Basic functionality на KES — позволяет создавать и запускать задачи на хостах
На стороне KUMA необходимо указать адрес и порт 13299 (этот порт используется по полюумолчанию), Extraа содержащиетакже ключУЗ со определеннымкоторой значениемговорилось
SELECT * FROM `events` WHERE visitParamExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250
выше.SELECT * FROM `events` WHERE JSONExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250
При
Поиск событий по полю Extra НЕ содержащие ключ с определенным значением
SELECT * FROM `events` WHERE visitParamHas(Extra, 'memUsage') AND NOT visitParamExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250
Работа со временем
Timestamp по умолчаниюнажатии в формате epoch time числа с милисекундами (UnixTimestamp)
Список тайм зон: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
Задание таймзоны
SELECT Timestamp, fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow') as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
Указание своего формата времени
SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S') as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
Свой формат времени с таймзоной
SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
События в "рабочее время" с 9 до 18
SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) >= 9 AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) < 18 ORDER BY Timestamp ASC LIMIT 250
Подсчет события по фильтру за период вчерашнего дня с 00 до 04 часов с группировкой по SourceAddress
SELECT count(ID) as Count_Num, SourceAddress, groupArray(fromUnixTimestamp64Milli(Timestamp)) as time_stm FROM `events` WHERE DeviceEventClassID = '4624' AND DeviceProduct = 'Windows' AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000 AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 4 HOUR)*1000 GROUP BY SourceAddress LIMIT 250
Фиксируются таймштемпы входа от одного источника с подсчетом количества.
Экстра запросы
Склейка по времени подключений пользователей с одного адреса иKUMA на одинкнопку VPN сервер
SELECT SourceUserName, SourceAddress, SourceHostName, groupArray(FlexString1) as time FROM `events` where SourceProcessName = 'Create session' GROUP BY SourceUserName, SourceAddress, SourceHostName LIMIT 10
Обрезка доменов до второго уровня и условие с несколькими запросами
SELECT count(ID) as cnt, cutToFirstSignificantSubdomain(DestinationHostName) as dstH FROM `events` WHERE DeviceCustomString1 = 'Q' AND DestinationProcessName = 'DNS' AND DeviceCustomString5 IN ('A'"Сохранить", 'AAAA',не 'HTTPS')должно GROUPвсплавать BYсообщений dstHоб ORDERошибке, BYэто cntзначит, DESC LIMIT 50
Найти и вывестичто все базовые события конкретного корреляционного события по его ID
ОК.SELECT * FROM `events` WHERE ID IN (SELECT arrayJoin(splitByChar(',', replaceRegexpAll(JSON_QUERY(BaseEvents, '$[*].ID'), '\\[|\\]|"| ', ''))) as TestID FROM `events` WHERE (Type = 3) AND (ID = '25e4eae3-ad6d-4114-b99e-019de3574d16'))