Обработчики потоков данных
Конфигурация обработчиков событий настраивается на вкладке Конфигурация соответствующего Потока данных (руководство)
Настройка непосредственно сценария обработки производится на визуальном движке программирования (руководство)
После внесения изменений в сценарии необходимо произвести его компиляцию соответствующей кнопкой в верхней панели меню.
Изменения в сценарии, после компиляции, подтягиваются в Поток данных автоматически.
Системные функции обработчиков событий
DefaultParser
-
Категория: Automaton.Cl
-
Тип функции: Impure
-
Описание:
Функция принимает на вход структуру, содержащую событие с коллектора логов Monq. Преобразовывает и возвращает валидное событие для записи в БД.
-
Inputs
Название Тип Описание Параметры In Exec Пин вызова функции Связь Event Struct: RawCollectorLogEvent Принимает исходное событие Связь -
Outputs
Название Тип Описание Параметры Ok Exec Пин вызова функции, если преобразование выполнено успешно Связь MonitoringEvent Exec Пин вызова функции, в случае получения события мониторинга Связь Failed Exec Пин вызова функции при наличии ошибки Связь Result Struct:OnProcessedLogEvent Возвращает результат обработчика Связь Error String Возвращает сообщение об ошибке обработчика Связь
-
Исходный код функции DefaultParser
namespace Automaton.Cl
{
public static class DefaultParserGlobalFunc
{
static FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)> ValidateData(string data, string sourceType, out System.Xml.XmlDocument xmlDocument)
{
xmlDocument = null;
switch (sourceType)
{
case "application/json":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Trim();
if (!(data.StartsWith("{") && data.EndsWith("}")) && !(data.StartsWith("[") && data.EndsWith("]")))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source has an incorrect format."));
try
{
System.Text.Json.Nodes.JsonNode.Parse(data);
}
catch (System.Text.Json.JsonException ex)
{
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "application/x-ndjson":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Replace("\r", "").Trim();
if (!(data.StartsWith("{") && data.EndsWith("}")))
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source has an incorrect format."));
try
{
var splitedJsons = data.Split("\n");
foreach (var json in splitedJsons)
System.Text.Json.Nodes.JsonNode.Parse(json);
}
catch (System.Text.Json.JsonException ex)
{
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "text/plain":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "text/xml":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Trim();
try
{
var doc = new System.Xml.XmlDocument();
doc.LoadXml(data);
xmlDocument = doc;
}
catch (System.Xml.XmlException ex)
{
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
}
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Source type has an incorrect format."));
}
public static FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)> DefaultParser(Automaton.Cl.RawCollectorLogEvent Event)
{
var data = Event.source;
var sourceType = Event._sourceType;
var funcResult = ValidateData(data, sourceType, out var xmlDocument);
if (funcResult.OutPinName == "Failed")
return funcResult;
data = data.Replace("\r", "").Trim();
var processedLogEvent = new Automaton.Cl.ProcessedCollectorLogEvent()
{
_userspaceId = Event._userspaceId,
_stream = Event._stream,
_sourceType = Event._sourceType,
_rawId = Event._rawId,
_aggregatedAt = Event._aggregatedAt
};
switch (sourceType)
{
case "application/json":
{
// Проверка события на тип MonitoringEvent.
var node = System.Text.Json.Nodes.JsonNode.Parse(data);
processedLogEvent.source = node;
if (node is System.Text.Json.Nodes.JsonObject jsonObject && jsonObject.ContainsKey("monqStreamControl"))
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("MonitoringEvent", (processedLogEvent, "The event is stream monitoring event."));
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "application/x-ndjson":
{
data = data.Replace("\r", "").Trim();
var jsonArray = $"[{string.Join(",", data.Split("\n"))}]";
var parsed = System.Text.Json.Nodes.JsonNode.Parse(jsonArray);
processedLogEvent.source = parsed;
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "text/xml":
{
var json = Newtonsoft.Json.JsonConvert.SerializeXmlNode(xmlDocument);
processedLogEvent.source = json;
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "text/plain":
{
var result = System.Text.Json.JsonEncodedText.Encode(data);
var json = $@"{{""sourceText"": ""{result}""}}";
processedLogEvent.source = json;
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
default:
{
processedLogEvent.source = data;
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
}
}
}
}
AddLabels
-
Категория: Automaton.Cl
-
Тип функции: Impure
-
Описание:
Функция принимает на вход структуру
OnProcessedLogEvent
, и массив объектов, который будет добавлен к объекту_labels
, входящему в состав структуры.-
Inputs
Название Тип Описание Параметры In Exec Пин вызова функции Связь ProcessedEvent Struct:OnProcessedLogEvent Принимает на вход структуру OnProcessedLogEvent Связь Labels Dynamic (array) Массив меток Связь -
Outputs
Название Тип Описание Параметры Out Exec Пин вызова функции Связь Result Struct:OnProcessedLogEvent Обновленная модель события с добавленными метками. Связь
-
SendAutomatonEvent
-
Категория: Automaton.Core
-
Тип функции: Impure
-
Описание:
Функция принимает событие и отправляет в очередь RabbitMQ по указанному пользователем ключу события.
-
Inputs
Название Тип Описание Параметры In Exec Пин вызова функции Связь Value Struct: Any Принимает событие, которое будет отправлено в указанную очередь Связь EventName String Строка для указания ключа Связь/контрол Scenario Struct:ScenarioBasic Системная переменная, содержит метаданные текущего сценария Связь -
Outputs
Название Тип Описание Параметры Ok Exec Последовательность активна при успешной отправке события Связь Failed Exec Последовательность активна при ошибке отправки события Связь Error String Текст ошибки (при наличии) Связь
-