Skip to main content

Интеграция KUMA с KSC

Описание функций ClickHouse дДля интегработыции с KSC в KUMA необходимо заранее создать учетную запись (внутросами: https://clickhouse.com/docs/ru/sql-reference/functions/

Базовые запросы

Типы событий в KUMA

image.png

Подсчннет событий пго полю

SELECT count(ID) as count_num, DeviceVendor FROM `events` GROUP BY DeviceVendor ORDER BY count_num DESC LIMIT 250

image.png

Выполнение математических операций и сравнений

SELECT DeviceProduct, SourceUserName, round(sum(BytesIn)/1024, 2) as KiloBytes FROM `events` WHERE BytesIn > '0' OR BytesOut > '0' GROUP by SourceUserName, DeviceProduct ORDER BY KiloBytes DESC LIMIT 250

image.png

Поиск событий по подстроке (регистрозависимый) с условием И

SELECT * FROM `events` WHERE DeviceHostName like '%serv%' AND DeviceProduct = 'Windows' ORDER BY Timestamp DESC LIMIT 250

image.png

Поиск событий по подстроке (регистроНЕзависимый)

SELECT * FROM `events` WHERE DeviceEventCategory ilike '%auditing%' AND DeviceProduct = 'Windows' ORDER BY Timestamp DESC LIMIT 250

image.png

Поиск событий по нескольким значениям, вместо OR можно использоватьеля) IN

в

SELECT * FROM `events` WHERE DeviceEventClassID IN ('BROKER_USERLOGGEDOUT', 'BROKER_USERLOGGEDIN') AND DeviceProduct = 'Horizon' ORDER BY Timestamp DESC LIMIT 250KSC:

image.png

Поиск событий по подсети

SELECT * FROM `events` WHERE inSubnet(DeviceAddress, '10.68.85.70/32') ORDER BY Timestamp DESC LIMIT 250

image.png

Проверка работы обогащения

Обогащение событий Активами

SELECT * FROM `events` WHERE DeviceAssetID != '' OR SourceAssetID != '' OR DestinationAssetID != '' ORDER BY Timestamp DESC LIMIT 250

 

Обогащение событий с LDAP

SELECT * FROM `events` WHERE SourceAccountID != '' OR DestinationAccountID != '' ORDER BY Timestamp DESC LIMIT 250

 

Обогащение событий данными из  TI

SELECT * FROM `events` where not TI="" ORDER BY Timestamp DESC LIMIT 250

 

Работа с Extra полем

Поиск событий по полю Extra содержащие ключ

SELECT * FROM `events` WHERE visitParamHas(Extra, 'memUsage') ORDER BY Timestamp DESC LIMIT 250

image.pngimage.png

 

П

Минимальные разрешения для интеграции с KSC такие:

  • Access objects regardless of their ACLs — позволяет просто импортировать активы
  • Management of administration groups — позволяет перемещать активы между группами в KSC из интерфейска KUMA
  • Basic functionality на KES — позволяет собыздавать ий по полю Extra содержащие ключ с определенным запускать задачи начением

SELECTхостах * FROM `events` WHERE visitParamExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250

SELECT * FROM `events` WHERE JSONExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250

image.pngimage.png

 

ПоискНа стороне KUMA необыходимо указать адрес ий порт 13299 (этот порт исполю Extra НЕ ьзуетсодержащие ключ с определенным значением

SELECT * FROM `events` WHERE visitParamHas(Extra, 'memUsage') AND NOT visitParamExtractString(Extra, 'memUsage') = '61367' ORDER BY Timestamp DESC LIMIT 250

image.png

 

Работа со временем 

Timestampя по умолчанию), в формате epoch time числа  с милисекундами (UnixTimestamp)

Список также УЗ о которойм згон:ворилось https://en.wikipedia.org/wiki/List_of_tz_database_time_zones выше.

image.pngimage.png

При

нажатии

Зв KUMA над кнопку "Сохрание таймзоны

SELECT Timestamp, fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow') as NormTimeь", DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250

image.png

 

Указание своего формата времени

SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S') as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250

image.png

 

Свой формат времени с таймзоной

SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime, DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' ORDER BY Timestamp DESC LIMIT 250

image.png

 

События в "рабочее время" с 9 должно 18

SELECT Timestamp, formatDateTime(fromUnixTimestamp64Milli(Timestamp), '%d-%m-%Y %H:%M:%S', 'Europe/Moscow') as NormTime,DeviceProduct, Name, Message FROM `events` WHERE DeviceProduct = 'EDR' AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) >= 9 AND toHour(fromUnixTimestamp64Milli(Timestamp, 'Europe/Moscow')) < 18 ORDER BY Timestamp ASC LIMIT 250

image.png

 

Подвсчеплавать сообытия по фильтру  за период вчерашнего дня с 00 до 04 часов с группировкой по SourceAddress

SELECT count(ID) as Count_Num, SourceAddress, groupArray(fromUnixTimestamp64Milli(Timestamp)) as time_stm FROM `events` WHERE DeviceEventClassID = '4624' AND DeviceProduct = 'Windows' AND Timestamp >= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)))*1000 AND Timestamp <= toUnixTimestamp(toStartOfDay((now() - INTERVAL 1 DAY)) + INTERVAL 4 HOUR)*1000 GROUP BY SourceAddress LIMIT 250

image.png

Фиксируются таймштемпы входа от одного источника с подсчетом количества.

 

Экстра запросы

Склейка по времени подключщений пользователей сб одногшибке, это адреса и значит, один VPN сервер

SELECT SourceUserName, SourceAddress, SourceHostName, groupArray(FlexString1) as time FROM `events` where SourceProcessName = 'Create session' GROUP BY SourceUserName, SourceAddress, SourceHostName LIMIT 10

image.png

 

Обрезка доменов до вчторого уровня и условие с несколькими запросами

SELECT count(ID) as cnt, cutToFirstSignificantSubdomain(DestinationHostName) as dstH FROM `events` WHERE DeviceCustomString1 = 'Q' AND DestinationProcessName = 'DNS' AND DeviceCustomString5 IN ('A', 'AAAA', 'HTTPS') GROUP BY dstH ORDER BY cnt DESC LIMIT 50

image.png

 

Найти и вывести все базовые события конкретного корреляционного события по его ID

SELECT * FROM `events` WHERE ID IN (SELECT arrayJoin(splitByChar(',', replaceRegexpAll(JSON_QUERY(BaseEvents, '$[*]ОК.ID'), '\\[|\\]|"| ', ''))) as TestID FROM `events` WHERE (Type = 3) AND (ID = '25e4eae3-ad6d-4114-b99e-019de3574d16'))

image.png