Правила сбора и анализа данных (Data Mining)
В отличие от потоковой корреляции, работающей в режиме реального времени, Data Mining правила позволяют с помощью языка SQL и функций ClickHouse (примеры запросов, почти все возможно использовать) распознавать и анализировать события, сохраненных в хранилище KUMA (можно указать и конкретный спейс хранилища).
Для работы необходимо указать, рассмотрим на примере:
- В Ресурсах - Правила сбора и анализа данных Создать правило
- В правиле указать:
-
- Интервал (частота) выполнения SQL-запроса можно указать в минутах, часах и днях (минимум 1 минута)
- SQL-запрос должен содержать функцию агрегации (примеры) и/или группировку (GROUP BY) данных c обязательным указанием ограничения LIMIT (от 1 до 10 000)
Каждое выполнение такого правила происходит в виде запроса в Хранилище, а это значит неосторожным движением в виде частого или тяжелого правила можно нагрузить базу больше чем хотелось бы
В примере рассматривается запрос на основе событий Windows по пользователям (DestinationUserName) событиям входа (EventID 4624) и выхода (EventID 4634) с расчетом среднего времени сесии пользователя за последние 24 часа.
Посмотреть SQL запрос (пример)
SELECT
login_events.DestinationUserName AS destination_user_name,
round(AVG(logout_events.logout_time - login_events.login_time)/1000) AS avg_time_diff_s,
COUNT(DISTINCT login_events.login_time) AS total_logins,
COUNT(DISTINCT logout_events.logout_time) AS total_logouts,
concat(
toString(floor(avg_time_diff_s / 86400)), ' days, ',
toString(floor((avg_time_diff_s % 86400) / 3600)), ' hours, ',
toString(floor((avg_time_diff_s % 3600) / 60)), ' minutes, ',
toString(avg_time_diff_s % 60), ' seconds'
) AS human_readable_diff
FROM
(SELECT
DestinationUserName,
toUnixTimestamp(EndTime) AS login_time,
FlexString1 AS logon_id
FROM `events`
WHERE DeviceEventClassID = '4624'
AND EndTime >= now() - INTERVAL 24 HOUR
AND DestinationUserName NOT LIKE '%$%') AS login_events
INNER JOIN
(SELECT
DestinationUserName,
toUnixTimestamp(EndTime) AS logout_time,
FlexString1 AS logon_id
FROM `events`
WHERE DeviceEventClassID = '4634'
AND EndTime >= now() - INTERVAL 24 HOUR
AND DestinationUserName NOT LIKE '%$%') AS logout_events
ON login_events.DestinationUserName = logout_events.DestinationUserName
AND logout_events.logon_id = login_events.logon_id
WHERE logout_events.logout_time >= login_events.login_time
GROUP BY login_events.DestinationUserName
ORDER BY avg_time_diff_s DESC
LIMIT 100
-
- Добавить маппинг (сопоставление) по полям запроса и модели KUMA
-
- Привязать хранилище по которому будет осуществляться поиск
- Привязать коррелятор с соответвующим правилом корреляции для сработки
- В Ресурсах - Сбор и анализ данных добавить ранее созданное правило
- Для ручного запуска нажмите кнопку Запустить
- По результатам запроса на выходе будут какие-то данные, которые не будут нигде сохраняться, но на них можно настроить правило корреляции. В нашем случае правило ловит события, где время сессии меньше 5 секунд:
Корреляционное событие выглядит следующим образом:
А событие на основе которого произошла сработка:
Работу правил можно отслеживать с помощью метрик в разделе KUMA Core:
No Comments