Запросы в KUMA (примеры)
Описание функций ClickHouse для работы с запросами: https://clickhouse.com/docs/ru/sql-reference/functions/
Базовые запросы
Типы событий в KUMA
Подсчет событий по полю
SELECT count(ID) as count_num, DeviceVendor
FROM `events`
GROUP BY DeviceVendor
ORDER BY count_num DESC LIMIT 250
Выполнение математических операций и сравнений
SELECT DeviceProduct, SourceUserName, round(sum(BytesIn)/1024, 2) as KiloBytes
FROM `events`
WHERE BytesIn > '0' OR BytesOut > '0'
GROUP by SourceUserName, DeviceProduct
ORDER BY KiloBytes DESC LIMIT 250
Поиск событий по подстроке (регистрозависимый) с условием И
SELECT *
FROM `events`
WHERE DeviceHostName like '%serv%' AND DeviceProduct = 'Windows'
ORDER BY Timestamp DESC LIMIT 250
Поиск событий по подстроке (регистроНЕзависимый)
SELECT * FROM `events` WHERE DeviceEventCategory ilike '%auditing%' AND DeviceProduct = 'Windows'
ORDER BY Timestamp DESC LIMIT 250
Поиск по исходному событию
SELECT * FROM `events` WHERE Raw ilike '%technique%' ORDER BY Timestamp DESC LIMIT 250
Поиск событий по нескольким значениям, вместо OR можно использовать IN
SELECT * FROM `events` WHERE DeviceEventClassID IN ('BROKER_USERLOGGEDOUT', 'BROKER_USERLOGGEDIN')
AND DeviceProduct = 'Horizon' ORDER BY Timestamp DESC LIMIT 250
Поиск событий по подсети
SELECT * FROM `events` WHERE inSubnet(DeviceAddress, '10.68.85.70/32') ORDER BY Timestamp DESC LIMIT 250
Проверка работы обогащения
Обогащение событий Активами
SELECT * FROM `events` WHERE DeviceAssetID != '' OR SourceAssetID != '' OR DestinationAssetID != '' ORDER BY Timestamp DESC LIMIT 250
Обогащение событий с LDAP
SELECT * FROM `events` WHERE SourceAccountID != '' OR DestinationAccountID != '' ORDER BY Timestamp DESC LIMIT 250
Обогащение событий данными из TI
SELECT * FROM `events` where not TI="" ORDER BY Timestamp DESC LIMIT 250
Работа с Extra полем
Поиск событий по полю Extra содержащие ключ
SELECT * FROM `events` WHERE visitParamHas(Extra, 'memUsage') ORDER BY Timestamp DESC LIMIT 250
Поиск событий по полю Extra содержащие ключ с определенным значением
SELECT * FROM `events` WHERE visitParamExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250
SELECT * FROM `events` WHEREJSONExtractString(Extra, 'memUsage') = '61367'ORDER BY Timestamp DESC LIMIT 250
SELECT * FROM `events` WHERE JSONExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250
Поиск событий по полю Extra НЕ содержащие ключ с определенным значением
SELECT * FROM `events` WHERE visitParamHas(Extra, 'memUsage') AND NOT visitParamExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250
Работа со временем
Timestamp по умолчанию в формате epoch time числа с милисекундами (UnixTimestamp)
Список тайм зон: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
Задание таймзоны
SELECT Timestamp, fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')
as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
Указание своего формата времени
SELECT Timestamp,formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S')as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
Указание Ссвойего формата времени с таймзоной
SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
Свой формат времени с таймзоной
SELECT Timestamp,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250
События в "рабочее время" с 9 до 18
SELECT Timestamp,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR' AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) >= 9 AND
toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) < 18
ORDER BY Timestamp ASC LIMIT 250
Подсчет события по фильтру за период вчерашнего дня с 00 до 04 часов с группировкой по SourceAddress
SELECT count(ID) as Count_Num, SourceAddress, groupArray(fromUnixTimestamp64Milli(Timestamp)) as time_stm
FROM `events`
WHERE DeviceEventClassID = '4624' AND DeviceProduct = 'Windows' AND
Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000 AND
Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 4 HOUR)*1000
GROUP BY SourceAddress LIMIT 250
Подсчет уникальных адресов посетителей kb.kuma-community.ru
SELECT count(DISTINCT SourceAddress) as uniqSrcIP
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24')
Можно использовать аналогичную более быструю функцию:
SELECT uniq(SourceAddress) as uniqSrcIP
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24')
Подсчет
Муникальных адресожнв посетителей kb.kuma-community.ru по использоватьдням
аналогичную более быструю функцию:
SELECT
uniq(count(DISTINCT SourceAddress) asuniqSrcIP`metric`, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y', 'Europe/Moscow') as value FROM `events` WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access' AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb' AND not inSubnet(SourceAddress, '91.103.66.0/24')
Подсчет уникальных адресов посетителей kb.kuma-community.ru по дням
SELECT count(DISTINCT SourceAddress) as `metric`,formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y', 'Europe/Moscow') as valueFROM `events` WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access' AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb' AND not inSubnet(SourceAddress, '91.103.66.0/24')GROUP BY value
Подсчет среднего количесва переданных байт посетителей kb.kuma-community.ru по источнику IP c 00:00 по 08:00
SELECT avg(toInt32(BytesOut)), SourceAddress FROM `events` WHERE BytesOut!= 0 AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000 AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 8 HOUR)*1000 GROUP BY SourceAddress DESC LIMIT 250
Экстра запросы
Склейка по времени подключений пользователей с одного адреса и на один VPN сервер
SELECT SourceUserName, SourceAddress, SourceHostName, groupArray(FlexString1) as time
FROM `events`
whereWHERE SourceProcessName = 'Create session'
GROUP BY SourceUserName, SourceAddress, SourceHostName LIMIT 10
Обрезка доменов до второго уровня и условие с несколькими запросами
SELECT count(ID) as cnt, cutToFirstSignificantSubdomain(DestinationHostName) as dstH
FROM `events`
WHERE DeviceCustomString1 = 'Q' AND DestinationProcessName = 'DNS'
AND DeviceCustomString5 IN ('A', 'AAAA', 'HTTPS')
GROUP BY dstH
ORDER BY cnt DESC LIMIT 50
Группировка событий по категории из TI (Regex)
SELECT count(ID) as cnt, extract(TI, '.+category\"\:\"([^"]+).+') as category
FROM `events`
WHERE TI !=''
GROUP BY category
ORDER BY cnt DESC
Найти и вывести все базовые события конкретного корреляционного события по его ID
SELECT *
FROM `events`
WHERE ID IN (
SELECT arrayJoin(splitByChar(',', replaceRegexpAll(JSON_QUERY(BaseEvents, '$[*].ID'), '\\[|\\]|"| ', ''))) as TestID
FROM `events`
WHERE (Type = 3) AND (ID = '25e4eae3-ad6d-4114-b99e-019de3574d16'019de3574d16')
)