Запросы в KUMA (примеры)
Описание функций ClickHouse для работы с запросами: https://clickhouse.com/docs/ru/sql-reference/functions/
Запрос из интерфейса пробрасывается в БД с добавлением границ временного промежутка и выбранных тенантов, пример: ... AND (Timestamp >= 1715689595208 AND Timestamp <= 1715689895208) AND (TenantID IN ('a1fbde7a-76d3-4bbc-a769-82126b41b56f')) ORDER BY ...
Базовые запросы (поиск событий)
Типы событий в KUMA
Подсчет событий по полю
SELECT count(ID) as count_num, DeviceVendor
FROM `events`
GROUP BY DeviceVendor
ORDER BY count_num DESC LIMIT 250
Выполнение математических операций и сравнений
SELECT DeviceProduct, SourceUserName, round(sum(BytesIn)/1024, 2) as KiloBytes
FROM `events`
WHERE BytesIn > '0' OR BytesOut > '0'
GROUP by SourceUserName, DeviceProduct
ORDER BY KiloBytes DESC LIMIT 250
Иногда бОльшую производительность дает условие со скобками WHERE (BytesIn > '0' OR BytesOut > '0')
По подстроке (регистрозависимый) с условием И
SELECT *
FROM `events`
WHERE DeviceHostName like '%serv%' AND DeviceProduct = 'Windows'
ORDER BY Timestamp DESC LIMIT 250
По подстроке (регистроНЕзависимый)
SELECT *
FROM `events`
WHERE DeviceEventCategory ilike '%auditing%' AND DeviceProduct = 'Windows'
ORDER BY Timestamp DESC LIMIT 250
По исходному / сырому событию
SELECT *
FROM `events`
WHERE Raw ilike '%technique%'
ORDER BY Timestamp DESC LIMIT 250
По нескольким значениям, вместо OR можно использовать IN
SELECT *
FROM `events`
WHERE DeviceEventClassID IN ('BROKER_USERLOGGEDOUT', 'BROKER_USERLOGGEDIN')
AND DeviceProduct = 'Horizon'
ORDER BY Timestamp DESC LIMIT 250
По подсети
SELECT *
FROM `events`
WHERE inSubnet(DeviceAddress, '10.68.85.70/32')
ORDER BY Timestamp DESC LIMIT 250
Внешние IPv4 адреса
SELECT count(ID) as count, DestinationAddress
FROM `events`
WHERE NOT empty(DestinationAddress) AND NOT (inSubnet(DestinationAddress, '10.0.0.0/8') OR inSubnet(DestinationAddress, '172.16.0.0/12') OR inSubnet(DestinationAddress, '192.168.0.0/16') OR inSubnet(DestinationAddress, '127.0.0.0/8'))
AND NOT isIPv6String(DestinationAddress)
GROUP BY DestinationAddress
ORDER BY count DESC LIMIT 250
По отсутствующему полю
SELECT *
FROM `events`
WHERE empty(DeviceEventClassID) AND Raw ilike '%backdoor_user%'
ORDER BY Timestamp DESC LIMIT 250
По регулярному выражению
SELECT *
FROM `events`
WHERE match(DestinationUserName, '^\w+\-\d\$$')
ORDER BY Timestamp DESC LIMIT 250
Проверка работы обогащения
Обогащение событий Активами
SELECT *
FROM `events`
WHERE (DeviceAssetID != '' OR SourceAssetID != '' OR DestinationAssetID != '')
ORDER BY Timestamp DESC LIMIT 250
Обогащение событий с LDAP
SELECT *
FROM `events`
WHERE (SourceAccountID != '' OR DestinationAccountID != '')
ORDER BY Timestamp DESC LIMIT 250
Обогащение событий данными из TI
SELECT *
FROM `events`
WHERE NOT TI=""
ORDER BY Timestamp DESC LIMIT 250
Работа с Extra полем
По полю Extra содержащие ключ
SELECT *
FROM `events`
WHERE visitParamHas(Extra, 'memUsage')
ORDER BY Timestamp DESC LIMIT 250
По полю Extra содержащие ключ с определенным значением
SELECT *
FROM `events`
WHERE visitParamExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250
SELECT *
FROM `events`
WHERE JSONExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250
По полю Extra НЕ содержащие ключ с определенным значением
SELECT *
FROM `events`
WHERE visitParamHas(Extra, 'memUsage')
AND NOT visitParamExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250
Работа со временем
Timestamp по умолчанию в формате epoch time числа с милисекундами (UnixTimestamp)
Список тайм зон: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
Задание таймзоны
SELECT Timestamp, fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR'
ORDER BY Timestamp DESC LIMIT 250
Указание своего формата времени
SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%i:%S') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR'
ORDER BY Timestamp DESC LIMIT 250
Свой формат времени с таймзоной
SELECT Timestamp,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%i:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR'
ORDER BY Timestamp DESC LIMIT 250
События в "рабочее время" с 9 до 18
SELECT Timestamp,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%i:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR' AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) >= 9
AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) < 18
ORDER BY Timestamp ASC LIMIT 250
Подсчет события по фильтру за период вчерашнего дня с 00 до 04 часов с группировкой по SourceAddress
SELECT count(ID) as Count_Num, SourceAddress, groupArray(fromUnixTimestamp64Milli(Timestamp)) as time_stm
FROM `events`
WHERE DeviceEventClassID = '4624' AND DeviceProduct = 'Windows'
AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000
AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 4 HOUR)*1000
GROUP BY SourceAddress LIMIT 250
Подсчет уникальных адресов посетителей kb.kuma-community.ru
SELECT count(DISTINCT SourceAddress) as uniqSrcIP
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24')
Можно использовать аналогичную более быструю функцию:
SELECT uniq(SourceAddress) as uniqSrcIP
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24')
Подсчет уникальных адресов посетителей kb.kuma-community.ru по дням
SELECT count(DISTINCT SourceAddress) as `metric`,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y', 'Europe/Moscow') as value
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24')
GROUP BY value
Подсчет среднего количесва переданных байт посетителей kb.kuma-community.ru по источнику IP c 00:00 по 08:00
SELECT avg(toInt32(BytesOut)), SourceAddress
FROM `events`
WHERE BytesOut!= 0 AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000
AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 8 HOUR)*1000
GROUP BY SourceAddress DESC LIMIT 250
Отображение количества переданных и принятых байт по внутренним адресам более 1 Гб
SELECT StartTime , EndTime , SourceAddress AS `SOURSE ADDRESS`, DestinationPort AS `TO PORT`, ApplicationProtocol AS `APPLICATION`, DeviceCustomString1 AS `RULE`, formatReadableSize(BytesIn) AS `SENT`, formatReadableSize(BytesOut) AS `RECEIVED`, formatReadableSize(FlexNumber1) AS `TOTAL`
FROM `events`
WHERE FlexNumber1 > 1000000000
GROUP BY StartTime, EndTime, SourceAddress, DestinationPort, ApplicationProtocol, DeviceCustomString1, BytesIn, BytesOut, FlexNumber1
ORDER BY `EndTime` DESC LIMIT 250
Экстра запросы
Склейка по времени подключений пользователей с одного адреса и на один VPN сервер
SELECT SourceUserName, SourceAddress, SourceHostName, groupArray(FlexString1) as time
FROM `events`
WHERE SourceProcessName = 'Create session'
GROUP BY SourceUserName, SourceAddress, SourceHostName LIMIT 10
Склейка различных адресов подключений от одного пользователя на VPN сервере
Оператор HAVING доступен с версии KUMA 3.0
SELECT uniq(SourceAddress) as "Количество уникальных адресов", DestinationUserName as "Имя пользователя", groupUniqArray(SourceAddress) as "Список адресов"
FROM `events`
WHERE DeviceProduct = 'Ngate' AND Name = 'Create session'
GROUP BY "Имя пользователя"
HAVING "Количество уникальных адресов" > 1
ORDER BY "Количество уникальных адресов" DESC
LIMIT 10
Обрезка доменов до второго уровня и условие с несколькими запросами
SELECT count(ID) as cnt, cutToFirstSignificantSubdomain(DestinationHostName) as dstH
FROM `events`
WHERE DeviceCustomString1 = 'Q' AND DestinationProcessName = 'DNS'
AND DeviceCustomString5 IN ('A', 'AAAA', 'HTTPS')
GROUP BY dstH
ORDER BY cnt DESC LIMIT 50
Обрезка доменов до второго уровня и исключение IP адресов с помощью регулярного выражения
SELECT count(ID) as `metric`, cutToFirstSignificantSubdomain(RequestUrl) as `value`,
match(`value`, '^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$') AS is_valid_domain
FROM `events`
WHERE DeviceProduct = 'UTM' AND FlexString2 = 'Instant Messaging' AND Type != 3 AND is_valid_domain = 1
GROUP BY `value` ORDER BY `metric` DESC LIMIT 250
Группировка событий по категории из TI (Regex)
SELECT count(ID) as cnt, extract(TI, '.+category\"\:\"([^"]+).+') as category
FROM `events`
WHERE TI !=''
GROUP BY category
ORDER BY cnt DESC
Все базовые события конкретного корреляционного события по его ID
SELECT *
FROM `events`
WHERE ID IN (
SELECT arrayJoin(splitByChar(',', replaceRegexpAll(JSON_QUERY(BaseEvents, '$[*].ID'), '\\[|\\]|"| ', ''))) as TestID
FROM `events`
WHERE (Type = 3) AND (ID = '25e4eae3-ad6d-4114-b99e-019de3574d16')
)
Дерево процесса
SELECT
b.DeviceHostName AS HostName,
b.SourceAccountID AS AccountID,
b.SourceUserName AS UserName,
a.DeviceCustomString3 AS GrandParentProcessID,
a.SourceProcessName AS GrandParentProcessName,
a.DeviceCustomString5 AS ParentProcessID,
a.DestinationProcessName AS ParentProcessName,
b.DeviceCustomString5 AS ProcessID,
b.DestinationProcessName AS ProcessName,
concat(a.SourceProcessName, ' -> ', a.DestinationProcessName, ' -> ', b.DestinationProcessName) AS ProcessTree
FROM `events` AS a
INNER JOIN
(
SELECT *
FROM `events`
WHERE DeviceEventClassID='4688'
) AS b
ON a.DeviceCustomString5 = b.DeviceCustomString3
WHERE DeviceEventClassID='4688'
Среднее время сессии Windows пользователей за 24 часа
Рассматривается запрос на основе событий Windows по пользователям (DestinationUserName) событиям входа (EventID 4624) и выхода (EventID 4634) с расчетом среднего времени сесии пользователя за последние 24 часа
SELECT
login_events.DestinationUserName AS destination_user_name,
round(AVG(logout_events.logout_time - login_events.login_time)/1000) AS avg_time_diff_s,
COUNT(DISTINCT login_events.login_time) AS total_logins,
COUNT(DISTINCT logout_events.logout_time) AS total_logouts,
concat(
toString(floor(avg_time_diff_s / 86400)), ' days, ',
toString(floor((avg_time_diff_s % 86400) / 3600)), ' hours, ',
toString(floor((avg_time_diff_s % 3600) / 60)), ' minutes, ',
toString(avg_time_diff_s % 60), ' seconds'
) AS human_readable_diff
FROM
(SELECT
DestinationUserName,
toUnixTimestamp(EndTime) AS login_time,
FlexString1 AS logon_id
FROM `events`
WHERE DeviceEventClassID = '4624'
AND EndTime >= now('Europe/Moscow') - INTERVAL 24 HOUR
AND DestinationUserName NOT LIKE '%$%') AS login_events
INNER JOIN
(SELECT
DestinationUserName,
toUnixTimestamp(EndTime) AS logout_time,
FlexString1 AS logon_id
FROM `events`
WHERE DeviceEventClassID = '4634'
AND EndTime >= now('Europe/Moscow') - INTERVAL 24 HOUR
AND DestinationUserName NOT LIKE '%$%') AS logout_events
ON login_events.DestinationUserName = logout_events.DestinationUserName
AND logout_events.logon_id = login_events.logon_id
WHERE logout_events.logout_time >= login_events.login_time
GROUP BY login_events.DestinationUserName
ORDER BY avg_time_diff_s DESC
LIMIT 100
Несмотря на ошибку запрос выполняется корректно:
Частота и отношение неуспешных входов к успешным в Windows
SELECT
count(CASE WHEN DeviceEventClassID = '4625' THEN 1 END) as `Failed logins`,
count(CASE WHEN DeviceEventClassID = '4624' THEN 1 END) as `Success logins`,
`Failed logins`/`Success logins` as `Ratio`
FROM 'events'
HAVING `Ratio` > 0.03
Или
SELECT countIf(DeviceEventClassID = '4625') as F, countIf(DeviceEventClassID = '4624') as S
FROM 'events'
LIMIT 250
Работа с кастомными полями (SA. NA.)
Вхождение элемента по полю
Поиск проискодит по точному совпадению с элементом
SELECT *
FROM `events`
WHERE has(SA.user_groups, 'system:serviceaccounts')
ORDER BY Timestamp DESC LIMIT 250
Вхождение по подстроке элемента по полю (где более 2 значений в массиве)
Счет в массиве начинается с нуля
SELECT *
FROM `events`
WHERE length(arrayFilter(x -> x LIKE '%serviceaccounts%', SA.user_groups)) > 1
ORDER BY Timestamp DESC LIMIT 250
No Comments