Skip to main content

Запросы в KUMA (примеры)

Описание функций ClickHouse для работы с запросами: https://clickhouse.com/docs/ru/sql-reference/functions/ 

Запрос из интерфейса пробрасывается в БД с добавлением границ временного промежутка и выбранных тенантов, пример: ... AND (Timestamp >= 1715689595208 AND Timestamp <= 1715689895208) AND (TenantID IN ('a1fbde7a-76d3-4bbc-a769-82126b41b56f')) ORDER BY ...

Время в событиях

Если в событии присутствует информация о таймзоне (deviceTimeZone) или корректно парсится timestamp события в нормализаторе, где получаем ±hh:mm и маппится это значение в поле DeviceTimeZone - событие поступает в KUMA со значением этой таймзоны

Если в событии нет deviceTimeZone или время из события не содержит таймзону, которую можно распарсить, то для такого события устанавливается таймзона сервера на котором установлен коллектор.

Таймзону еще можно поправить следующим образом:

    создать правило обогащения типа таймзона и скорректировать временную зону; в поисковый запрос добавить константу с таймзоной (пример, SELECT '+0300' as  deviceTimeZone), можно так же указать таймзону формата +03:00 или +03.

    Базовые запросы (поиск событий)


    Типы событий в KUMA

    image.png


    Подсчет событий по полю

    SELECT count(ID) as count_num, DeviceVendor
    FROM `events`
    GROUP BY DeviceVendor
    ORDER BY count_num DESC LIMIT 250

    image.png


    Выполнение математических операций и сравнений

    SELECT DeviceProduct, SourceUserName, round(sum(BytesIn)/1024, 2) as KiloBytes 
    FROM `events` 
    WHERE BytesIn > '0' OR BytesOut > '0'
    GROUP by SourceUserName, DeviceProduct
    ORDER BY KiloBytes DESC LIMIT 250

    Иногда бОльшую производительность дает условие со скобками WHERE (BytesIn > '0' OR BytesOut > '0')


    По подстроке (регистрозависимый) с условием И

    SELECT *
    FROM `events`
    WHERE DeviceHostName like '%serv%' AND DeviceProduct = 'Windows'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По подстроке (регистроНЕзависимый)

    SELECT *
    FROM `events`
    WHERE DeviceEventCategory ilike '%auditing%' AND DeviceProduct = 'Windows'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По исходному / сырому событию

    SELECT *
    FROM `events`
    WHERE Raw ilike '%technique%'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По нескольким значениям, вместо OR можно использовать IN

    SELECT *
    FROM `events`
    WHERE DeviceEventClassID IN ('BROKER_USERLOGGEDOUT', 'BROKER_USERLOGGEDIN')
    AND DeviceProduct = 'Horizon'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По подсети

    SELECT *
    FROM `events`
    WHERE inSubnet(DeviceAddress, '10.68.85.70/32')
    ORDER BY Timestamp DESC LIMIT 250

    image.png

    Внешние IPv4 адреса

    SELECT count(ID) as count, DestinationAddress 
    FROM `events` 
    WHERE NOT empty(DestinationAddress) AND NOT (inSubnet(DestinationAddress, '10.0.0.0/8') OR inSubnet(DestinationAddress, '172.16.0.0/12') OR inSubnet(DestinationAddress, '192.168.0.0/16') OR inSubnet(DestinationAddress, '127.0.0.0/8')) 
    AND NOT isIPv6String(DestinationAddress) 
    GROUP BY DestinationAddress 
    ORDER BY count DESC LIMIT 250

    image.png

    По отсутствующему полю

    SELECT *
    FROM `events`
    WHERE empty(DeviceEventClassID) AND Raw ilike '%backdoor_user%'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По регулярному выражению

    SELECT * 
    FROM `events` 
    WHERE match(DestinationUserName, '^\w+\-\d\$$')  
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    Проверка работы обогащения


    Обогащение событий Активами

    SELECT *
    FROM `events`
    WHERE (DeviceAssetID != '' OR SourceAssetID != '' OR DestinationAssetID != '')
    ORDER BY Timestamp DESC LIMIT 250

    Обогащение событий с LDAP 

    SELECT *
    FROM `events`
    WHERE (SourceAccountID != '' OR DestinationAccountID != '')
    ORDER BY Timestamp DESC LIMIT 250

    Обогащение событий данными из  TI

    SELECT *
    FROM `events`
    WHERE NOT TI=""
    ORDER BY Timestamp DESC LIMIT 250

    Работа с Extra полем


    По полю Extra содержащие ключ

    SELECT *
    FROM `events`
    WHERE visitParamHas(Extra, 'memUsage')
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По полю Extra содержащие ключ с определенным значением

    SELECT *
    FROM `events`
    WHERE visitParamExtractString(Extra, 'memUsage') = '61367'
    ORDER BY Timestamp DESC LIMIT 250
    SELECT *
    FROM `events`
    WHERE JSONExtractString(Extra, 'memUsage') = '61367'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    По полю Extra НЕ содержащие ключ с определенным значением

    SELECT *
    FROM `events`
    WHERE visitParamHas(Extra, 'memUsage') 
    AND NOT visitParamExtractString(Extra, 'memUsage') = '61367' 
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    Работа со временем 

    Timestamp по умолчанию в формате epoch time числа  с милисекундами (UnixTimestamp)

    Список тайм зон: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones 

    image.png


    Задание таймзоны

    SELECT Timestamp, fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow') as NormTime,
    DeviceProduct, Name, Message
    FROM `events` 
    WHERE DeviceProduct = 'EDR'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    Указание своего формата времени

    SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%i:%S') as NormTime,
    DeviceProduct, Name, Message
    FROM `events`
    WHERE DeviceProduct = 'EDR'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    Свой формат времени с таймзоной

    SELECT Timestamp,
    formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%i:%S', 'Europe/Moscow') as NormTime,
    DeviceProduct, Name, Message
    FROM `events`
    WHERE DeviceProduct = 'EDR'
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    События в "рабочее время" с 9 до 18

    SELECT Timestamp,
    formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%i:%S', 'Europe/Moscow') as NormTime,
    DeviceProduct, Name, Message 
    FROM `events` 
    WHERE DeviceProduct = 'EDR' AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) >= 9
    AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) < 18 
    ORDER BY Timestamp ASC LIMIT 250

    image.png


    Подсчет события по фильтру  за период вчерашнего дня с 00 до 04 часов с группировкой по SourceAddress

    SELECT count(ID) as Count_Num, SourceAddress, groupArray(fromUnixTimestamp64Milli(Timestamp)) as time_stm
    FROM `events` 
    WHERE DeviceEventClassID = '4624' AND DeviceProduct = 'Windows'
    AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000
    AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 4 HOUR)*1000
    GROUP BY SourceAddress LIMIT 250

    image.png


    Подсчет уникальных адресов посетителей kb.kuma-community.ru

    SELECT count(DISTINCT SourceAddress) as uniqSrcIP 
    FROM `events` 
    WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
    AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb' 
    AND not inSubnet(SourceAddress, '91.103.66.0/24')

    image.png

    Можно использовать аналогичную более быструю функцию:

    SELECT uniq(SourceAddress) as uniqSrcIP
    FROM `events`
    WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access' 
    AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
    AND not inSubnet(SourceAddress, '91.103.66.0/24') 

    Подсчет уникальных адресов посетителей kb.kuma-community.ru по дням

    SELECT count(DISTINCT SourceAddress) as `metric`,
    formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y', 'Europe/Moscow') as value
    FROM `events`
    WHERE Code = '200' AND DeviceCustomString2 != '-' AND DestinationServiceName = 'apache_access'
    AND DeviceCustomString3 = 'GET' AND DestinationHostName = 'kb'
    AND not inSubnet(SourceAddress, '91.103.66.0/24')
    GROUP BY value

    image.png


    Подсчет среднего количесва переданных байт посетителей kb.kuma-community.ru по источнику IP c 00:00 по 08:00

    SELECT avg(toInt32(BytesOut)), SourceAddress
    FROM `events`
    WHERE BytesOut!= 0 AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000
    AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 8 HOUR)*1000
    GROUP BY SourceAddress DESC LIMIT 250

    image.png

    Отображение количества переданных и принятых байт по внутренним адресам более 1 Гб

    SELECT StartTime , EndTime , SourceAddress AS `SOURSE ADDRESS`, DestinationPort AS `TO PORT`, ApplicationProtocol AS `APPLICATION`, DeviceCustomString1 AS `RULE`, formatReadableSize(BytesIn) AS `SENT`, formatReadableSize(BytesOut) AS `RECEIVED`, formatReadableSize(FlexNumber1) AS `TOTAL` 
    FROM `events` 
    WHERE FlexNumber1 > 1000000000 
    GROUP BY StartTime, EndTime, SourceAddress, DestinationPort, ApplicationProtocol, DeviceCustomString1, BytesIn, BytesOut, FlexNumber1 
    ORDER BY `EndTime` DESC LIMIT 250

    image.png


    Экстра запросы


    Склейка по времени подключений пользователей с одного адреса и на один VPN сервер

    SELECT SourceUserName, SourceAddress, SourceHostName, groupArray(FlexString1) as time
    FROM `events`
    WHERE SourceProcessName = 'Create session'
    GROUP BY SourceUserName, SourceAddress, SourceHostName LIMIT 10

    image.png


    Склейка различных адресов подключений от одного пользователя на VPN сервере

    Оператор HAVING доступен с версии KUMA 3.0

    SELECT uniq(SourceAddress) as "Количество уникальных адресов", DestinationUserName  as "Имя пользователя", groupUniqArray(SourceAddress) as  "Список адресов"
    FROM `events`
    WHERE DeviceProduct = 'Ngate' AND Name = 'Create session'
    GROUP BY "Имя пользователя"
    HAVING "Количество уникальных адресов" > 1
    ORDER BY "Количество уникальных адресов" DESC
    LIMIT 10

    image.png


    Обрезка доменов до второго уровня и условие с несколькими запросами

    SELECT count(ID) as cnt, cutToFirstSignificantSubdomain(DestinationHostName) as dstH
    FROM `events`
    WHERE DeviceCustomString1 = 'Q' AND DestinationProcessName = 'DNS'
    AND DeviceCustomString5 IN ('A', 'AAAA', 'HTTPS')
    GROUP BY dstH
    ORDER BY cnt DESC LIMIT 50

    image.png


    Обрезка доменов до второго уровня и исключение IP адресов с помощью регулярного выражения

    SELECT count(ID) as `metric`, cutToFirstSignificantSubdomain(RequestUrl) as `value`, 
    match(`value`, '^([a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$') AS is_valid_domain  
    FROM `events` 
    WHERE DeviceProduct = 'UTM' AND FlexString2 = 'Instant Messaging' AND Type != 3 AND is_valid_domain = 1 
    GROUP BY `value` ORDER BY  `metric` DESC LIMIT 250

    image.png


    Группировка событий по категории из TI (Regex)

    SELECT count(ID) as cnt, extract(TI, '.+category\"\:\"([^"]+).+') as category
    FROM `events`
    WHERE TI !=''
    GROUP BY category
    ORDER BY cnt DESC

    image.png


    Все базовые события конкретного корреляционного события по его ID

    SELECT *
    FROM `events`
    WHERE ID IN (
      SELECT arrayJoin(splitByChar(',', replaceRegexpAll(JSON_QUERY(BaseEvents, '$[*].ID'), '\\[|\\]|"| ', ''))) as TestID
      FROM `events`
      WHERE (Type = 3) AND (ID = '25e4eae3-ad6d-4114-b99e-019de3574d16')
    )

    image.png


    Дерево процесса

    SELECT
        b.DeviceHostName AS HostName, 
        b.SourceAccountID AS AccountID, 
        b.SourceUserName AS UserName, 
        a.DeviceCustomString3 AS GrandParentProcessID, 
        a.SourceProcessName AS GrandParentProcessName, 
        a.DeviceCustomString5 AS ParentProcessID, 
        a.DestinationProcessName AS ParentProcessName, 
        b.DeviceCustomString5 AS ProcessID, 
        b.DestinationProcessName AS ProcessName,
        concat(a.SourceProcessName, ' -> ', a.DestinationProcessName, ' -> ', b.DestinationProcessName) AS ProcessTree
    FROM `events` AS a
    INNER JOIN 
        (
            SELECT *
            FROM `events` 
            WHERE DeviceEventClassID='4688'
        ) AS b
    ON a.DeviceCustomString5 = b.DeviceCustomString3
    WHERE DeviceEventClassID='4688'

    image.png

    Среднее время сессии Windows пользователей за 24 часа

    Рассматривается запрос на основе событий Windows по пользователям (DestinationUserName) событиям входа (EventID 4624) и выхода (EventID 4634) с расчетом среднего времени сесии пользователя за последние 24 часа

    SELECT
        login_events.DestinationUserName AS destination_user_name,
        round(AVG(logout_events.logout_time - login_events.login_time)/1000) AS avg_time_diff_s,
        COUNT(DISTINCT login_events.login_time) AS total_logins,
        COUNT(DISTINCT logout_events.logout_time) AS total_logouts,
        concat(
            toString(floor(avg_time_diff_s / 86400)), ' days, ',
            toString(floor((avg_time_diff_s % 86400) / 3600)), ' hours, ',
            toString(floor((avg_time_diff_s % 3600) / 60)), ' minutes, ',
            toString(avg_time_diff_s % 60), ' seconds'
        ) AS human_readable_diff
    FROM 
        (SELECT
            DestinationUserName,
            toUnixTimestamp(EndTime) AS login_time,
            FlexString1 AS logon_id
        FROM `events`
        WHERE DeviceEventClassID = '4624'
        AND EndTime >= now('Europe/Moscow') - INTERVAL 24 HOUR
        AND DestinationUserName NOT LIKE '%$%') AS login_events
    INNER JOIN 
        (SELECT
            DestinationUserName,
            toUnixTimestamp(EndTime) AS logout_time,
            FlexString1 AS logon_id
        FROM `events`
        WHERE DeviceEventClassID = '4634'
        AND EndTime >= now('Europe/Moscow') - INTERVAL 24 HOUR
        AND DestinationUserName NOT LIKE '%$%') AS logout_events
    
        ON login_events.DestinationUserName = logout_events.DestinationUserName 
        AND logout_events.logon_id = login_events.logon_id
    
    WHERE logout_events.logout_time >= login_events.login_time 
    GROUP BY login_events.DestinationUserName
    ORDER BY avg_time_diff_s DESC 
    LIMIT 100

    Несмотря на ошибку запрос выполняется корректно:

    image.png

    Частота и отношение неуспешных входов к успешным в Windows

    SELECT 
    count(CASE WHEN DeviceEventClassID = '4625' THEN 1 END) as `Failed logins`, 
    count(CASE WHEN DeviceEventClassID = '4624' THEN 1 END) as `Success logins`, 
    `Failed logins`/`Success logins` as `Ratio`
    FROM 'events'
    HAVING `Ratio` > 0.03

    Или

    SELECT countIf(DeviceEventClassID = '4625') as F, countIf(DeviceEventClassID = '4624') as S
    FROM 'events'
    LIMIT 250

    image.png


    Работа с кастомными полями (SA. NA.) 

    Вхождение элемента по полю

    Поиск проискодит по точному совпадению с элементом

    SELECT * 
    FROM `events` 
    WHERE has(SA.user_groups, 'system:serviceaccounts') 
    ORDER BY Timestamp DESC LIMIT 250

    image.png


    Вхождение по подстроке элемента по полю (где более 2 значений в массиве)

    Счет в массиве начинается с нуля

    SELECT * 
    FROM `events` 
    WHERE length(arrayFilter(x -> x LIKE '%serviceaccounts%', SA.user_groups)) > 1 
    ORDER BY Timestamp DESC LIMIT 250

    image.png