Перейти к основному содержимому

Обработчики потоков данных

Конфигурация обработчиков событий настраивается на вкладке Конфигурация соответствующего Потока данных (руководство)

Настройка непосредственно сценария обработки производится на визуальном движке программирования (руководство)

Внимание

После внесения изменений в сценарии необходимо произвести его компиляцию соответствующей кнопкой в верхней панели меню.

Изменения в сценарии, после компиляции, подтягиваются в Поток данных автоматически.

Системные функции обработчиков событий

DefaultParser

  • Категория: Automaton.Cl

  • Тип функции: Impure

  • Описание:

    Функция принимает на вход структуру, содержащую событие с коллектора логов Monq. Преобразовывает и возвращает валидное событие для записи в БД.

    • Inputs

      НазваниеТипОписаниеПараметры
      InExecПин вызова функцииСвязь
      EventStruct: RawCollectorLogEventПринимает исходное событиеСвязь
    • Outputs

      НазваниеТипОписаниеПараметры
      OkExecПин вызова функции, если преобразование выполнено успешноСвязь
      MonitoringEventExecПин вызова функции, в случае получения события мониторингаСвязь
      FailedExecПин вызова функции при наличии ошибкиСвязь
      ResultStruct:OnProcessedLogEventВозвращает результат обработчикаСвязь
      ErrorStringВозвращает сообщение об ошибке обработчикаСвязь

Исходный код функции DefaultParser

DefaultParser
namespace Automaton.Cl
{
public static class DefaultParserGlobalFunc
{

static FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)> ValidateData(string data, string sourceType, out System.Xml.XmlDocument xmlDocument)
{
xmlDocument = null;
switch (sourceType)
{
case "application/json":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Trim();
if (!(data.StartsWith("{") && data.EndsWith("}")) && !(data.StartsWith("[") && data.EndsWith("]")))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source has an incorrect format."));
try
{
System.Text.Json.Nodes.JsonNode.Parse(data);
}
catch (System.Text.Json.JsonException ex)
{
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}

return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "application/x-ndjson":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Replace("\r", "").Trim();
if (!(data.StartsWith("{") && data.EndsWith("}")))
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source has an incorrect format."));
try
{
var splitedJsons = data.Split("\n");
foreach (var json in splitedJsons)
System.Text.Json.Nodes.JsonNode.Parse(json);
}
catch (System.Text.Json.JsonException ex)
{
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}

return new FuncResult<(ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}

case "text/plain":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));

return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}

case "text/xml":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Trim();
try
{
var doc = new System.Xml.XmlDocument();
doc.LoadXml(data);
xmlDocument = doc;
}
catch (System.Xml.XmlException ex)
{
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}

return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
}

return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Source type has an incorrect format."));
}

public static FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)> DefaultParser(Automaton.Cl.RawCollectorLogEvent Event)
{
var data = Event.source;
var sourceType = Event._sourceType;
var funcResult = ValidateData(data, sourceType, out var xmlDocument);
if (funcResult.OutPinName == "Failed")
return funcResult;
data = data.Replace("\r", "").Trim();

var processedLogEvent = new Automaton.Cl.ProcessedCollectorLogEvent()
{
_userspaceId = Event._userspaceId,
_stream = Event._stream,
_sourceType = Event._sourceType,
_rawId = Event._rawId,
_aggregatedAt = Event._aggregatedAt
};

switch (sourceType)
{
case "application/json":
{
// Проверка события на тип MonitoringEvent.
var node = System.Text.Json.Nodes.JsonNode.Parse(data);
processedLogEvent.source = node;
if (node is System.Text.Json.Nodes.JsonObject jsonObject && jsonObject.ContainsKey("monqStreamControl"))
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("MonitoringEvent", (processedLogEvent, "The event is stream monitoring event."));

return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "application/x-ndjson":
{
data = data.Replace("\r", "").Trim();
var jsonArray = $"[{string.Join(",", data.Split("\n"))}]";
var parsed = System.Text.Json.Nodes.JsonNode.Parse(jsonArray);
processedLogEvent.source = parsed;
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "text/xml":
{
var json = Newtonsoft.Json.JsonConvert.SerializeXmlNode(xmlDocument);
processedLogEvent.source = json;
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "text/plain":
{
var result = System.Text.Json.JsonEncodedText.Encode(data);
var json = $@"{{""sourceText"": ""{result}""}}";
processedLogEvent.source = json;
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
default:
{
processedLogEvent.source = data;
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
}

}
}
}

AddLabels

  • Категория: Automaton.Cl

  • Тип функции: Impure

  • Описание:

    Функция принимает на вход структуру OnProcessedLogEvent, и массив объектов, который будет добавлен к объекту _labels, входящему в состав структуры.

    • Inputs

      НазваниеТипОписаниеПараметры
      InExecПин вызова функцииСвязь
      ProcessedEventStruct:OnProcessedLogEventПринимает на вход структуру OnProcessedLogEventСвязь
      LabelsDynamic (array)Массив метокСвязь
    • Outputs

      НазваниеТипОписаниеПараметры
      OutExecПин вызова функцииСвязь
      ResultStruct:OnProcessedLogEventОбновленная модель события с добавленными метками.Связь

SendAutomatonEvent

  • Категория: Automaton.Core

  • Тип функции: Impure

  • Описание:

    Функция принимает событие и отправляет в очередь RabbitMQ по указанному пользователем ключу события.

    • Inputs

      НазваниеТипОписаниеПараметры
      InExecПин вызова функцииСвязь
      ValueStruct: AnyПринимает событие, которое будет отправлено в указанную очередьСвязь
      EventNameStringСтрока для указания ключаСвязь/контрол
      ScenarioStruct:ScenarioBasicСистемная переменная, содержит метаданные текущего сценарияСвязь
    • Outputs

      НазваниеТипОписаниеПараметры
      OkExecПоследовательность активна при успешной отправке событияСвязь
      FailedExecПоследовательность активна при ошибке отправки событияСвязь
      ErrorStringТекст ошибки (при наличии)Связь