Запросы в KUMA (примеры)
Описание функций ClickHouse для работы с запросами: https://clickhouse.com/docs/ru/sql-reference/functions/
Запрос из интерфейса пробрасывается в БД с добавлением границ временного промежутка и выбранных тенантов, пример: ... AND (Timestamp >= 1715689595208 AND Timestamp <= 1715689895208) AND (TenantID IN ('a1fbde7a-76d3-4bbc-a769-82126b41b56f')) ORDER BY ...
Базовые запросы
Типы событий в KUMA
Подсчет событий по полю
SELECT count(ID) as count_num, DeviceVendor
FROM `events`
GROUP BY DeviceVendor
ORDER BY count_num DESC LIMIT 250Выполнение математических операций и сравнений
SELECT DeviceProduct, SourceUserName, round(sum(BytesIn)/1024, 2) as KiloBytes 
FROM `events` 
WHERE BytesIn > '0' OR BytesOut > '0'
GROUP by SourceUserName, DeviceProduct
ORDER BY KiloBytes DESC LIMIT 250Иногда бОльшую производительность дает условие со скобками WHERE (BytesIn > '0' OR BytesOut > '0')
Поиск событий по подстроке (регистрозависимый) с условием И
SELECT *
FROM `events`
WHERE DeviceHostName like '%serv%' AND DeviceProduct = 'Windows'
ORDER BY Timestamp DESC LIMIT 250Поиск событий по подстроке (регистроНЕзависимый)
SELECT *
FROM `events`
WHERE DeviceEventCategory ilike '%auditing%' AND DeviceProduct = 'Windows'
ORDER BY Timestamp DESC LIMIT 250Поиск по исходному событию
SELECT *
FROM `events`
WHERE Raw ilike '%technique%'
ORDER BY Timestamp DESC LIMIT 250Поиск событий по нескольким значениям, вместо OR можно использовать IN
SELECT *
FROM `events`
WHERE DeviceEventClassID IN ('BROKER_USERLOGGEDOUT', 'BROKER_USERLOGGEDIN')
AND DeviceProduct = 'Horizon'
ORDER BY Timestamp DESC LIMIT 250Поиск событий по подсети
SELECT *
FROM `events`
WHERE inSubnet(DeviceAddress, '10.68.85.70/32')
ORDER BY Timestamp DESC LIMIT 250Проверка работы обогащения
Обогащение событий Активами
SELECT *
FROM `events`
WHERE DeviceAssetID != '' OR SourceAssetID != '' OR DestinationAssetID != ''
ORDER BY Timestamp DESC LIMIT 250Обогащение событий с LDAP 
SELECT *
FROM `events`
WHERE SourceAccountID != '' OR DestinationAccountID != ''
ORDER BY Timestamp DESC LIMIT 250Обогащение событий данными из  TI
SELECT *
FROM `events`
WHERE NOT TI=""
ORDER BY Timestamp DESC LIMIT 250Работа с Extra полем
Поиск событий по полю Extra содержащие ключ
SELECT *
FROM `events`
WHERE visitParamHas(Extra, 'memUsage')
ORDER BY Timestamp DESC LIMIT 250Поиск событий по полю Extra содержащие ключ с определенным значением
SELECT *
FROM `events`
WHERE visitParamExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250SELECT *
FROM `events`
WHERE JSONExtractString(Extra, 'memUsage') = '61367'
ORDER BY Timestamp DESC LIMIT 250Поиск событий по полю Extra НЕ содержащие ключ с определенным значением
SELECT *
FROM `events`
WHERE visitParamHas(Extra, 'memUsage') 
AND NOT visitParamExtractString(Extra, 'memUsage') = '61367' 
ORDER BY Timestamp DESC LIMIT 250Работа со временем
Timestamp по умолчанию в формате epoch time числа  с милисекундами (UnixTimestamp)
Список тайм зон: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
Задание таймзоны
SELECT Timestamp, fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events` 
WHERE DeviceProduct = 'EDR'
ORDER BY Timestamp DESC LIMIT 250Указание своего формата времени
SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR'
ORDER BY Timestamp DESC LIMIT 250Свой формат времени с таймзоной
SELECT Timestamp,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message
FROM `events`
WHERE DeviceProduct = 'EDR'
ORDER BY Timestamp DESC LIMIT 250События в "рабочее время" с 9 до 18
SELECT Timestamp,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,
DeviceProduct, Name, Message 
FROM `events` 
WHERE DeviceProduct = 'EDR' AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) >= 9
AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) < 18 
ORDER BY Timestamp ASC LIMIT 250Подсчет события по фильтру за период вчерашнего дня с 00 до 04 часов с группировкой по SourceAddress
SELECT count(ID) as Count_Num, SourceAddress, groupArray(fromUnixTimestamp64Milli(Timestamp)) as time_stm
FROM `events` 
WHERE DeviceEventClassID = '4624' AND DeviceProduct = 'Windows'
AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000
AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 4 HOUR)*1000
GROUP BY SourceAddress LIMIT 250Подсчет уникальных адресов посетителей kb.kuma-community.ru
SELECT count(DISTINCT SourceAddress) as uniqSrcIP 
FROM `events` 
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb' 
AND not inSubnet(SourceAddress, '91.103.66.0/24')Можно использовать аналогичную более быструю функцию:
SELECT uniq(SourceAddress) as uniqSrcIP
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access' 
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24') Подсчет уникальных адресов посетителей kb.kuma-community.ru по дням
SELECT count(DISTINCT SourceAddress) as `metric`,
formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y', 'Europe/Moscow') as value
FROM `events`
WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
AND not inSubnet(SourceAddress, '91.103.66.0/24')
GROUP BY valueПодсчет среднего количесва переданных байт посетителей kb.kuma-community.ru по источнику IP c 00:00 по 08:00
SELECT avg(toInt32(BytesOut)), SourceAddress
FROM `events`
WHERE BytesOut!= 0 AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000
AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 8 HOUR)*1000
GROUP BY SourceAddress DESC LIMIT 250Экстра запросы
Склейка по времени подключений пользователей с одного адреса и на один VPN сервер
SELECT SourceUserName, SourceAddress, SourceHostName, groupArray(FlexString1) as time
FROM `events`
WHERE SourceProcessName = 'Create session'
GROUP BY SourceUserName, SourceAddress, SourceHostName LIMIT 10Склейка различных адресов подключений от одного пользователя на VPN сервере
SELECT uniq(SourceAddress) as "Количество уникальных адресов", DestinationUserName  as "Имя пользователя", groupUniqArray(SourceAddress) as  "Список адресов"
FROM `events`
WHERE DeviceProduct = 'Ngate' AND Name = 'Create session'
GROUP BY "Имя пользователя"
HAVING "Количество уникальных адресов" > 1
ORDER BY "Количество уникальных адресов" DESC
LIMIT 10Обрезка доменов до второго уровня и условие с несколькими запросами
SELECT count(ID) as cnt, cutToFirstSignificantSubdomain(DestinationHostName) as dstH
FROM `events`
WHERE DeviceCustomString1 = 'Q' AND DestinationProcessName = 'DNS'
AND DeviceCustomString5 IN ('A', 'AAAA', 'HTTPS')
GROUP BY dstH
ORDER BY cnt DESC LIMIT 50Группировка событий по категории из TI (Regex)
SELECT count(ID) as cnt, extract(TI, '.+category\"\:\"([^"]+).+') as category
FROM `events`
WHERE TI !=''
GROUP BY category
ORDER BY cnt DESCНайти и вывести все базовые события конкретного корреляционного события по его ID
SELECT *
FROM `events`
WHERE ID IN (
  SELECT arrayJoin(splitByChar(',', replaceRegexpAll(JSON_QUERY(BaseEvents, '$[*].ID'), '\\[|\\]|"| ', ''))) as TestID
  FROM `events`
  WHERE (Type = 3) AND (ID = '25e4eae3-ad6d-4114-b99e-019de3574d16')
)Запросы к системным таблицам
У ClickHouse есть системные таблицы (https://clickhouse.com/docs/en/operations/system-tables). Одна из них - query_log (https://clickhouse.com/docs/en/operations/system-tables/query_log), содержит интересную информацию о запросах.
Чтобы обратиться к данным таблицами можно использовать клиент ClickHouse, который располагается по пути /opt/kaspersky/kuma/clickhouse/bin/client.sh
Можно заходить в консоль ClickHouse с помощью данного клиента, а также выполнять запросы через аргументы клиента, а также формировать запросы с помощью curl. В примерах ниже будет рассмотрен именно первый способ.
Вывод запросов с сортировкой по длительности
В примере ниже представлен запрос с сортировкой по длительности выполнения запросов за конкретный день
SELECT *
FROM system.query_log
WHERE query_kind = 'Select' AND current_database = 'kuma' AND event_date = '2024-01-22' 
AND query_duration_ms != 0
ORDER BY query_duration_ms DESC LIMIT 10Вывод запросов, завершившихся ошибкой
SELECT *
FROM system.query_log
WHERE type = 'ExceptionWhileProcessing' AND current_database = 'kuma'
LIMIT 10 
                






















