2
0
mirror of https://github.com/9ParsonsB/Pulsar.git synced 2025-07-01 08:23:42 -04:00

Journals Now processed in own thread

Some invalid journal data is now handled
Journals now use polymorphic deserialization
Added Event names to all journal events
Remove unused controllers
This commit is contained in:
2024-05-24 17:57:10 +10:00
parent ae848e036d
commit efd0b3e0c0
297 changed files with 827 additions and 209 deletions

View File

@ -0,0 +1,109 @@
namespace Pulsar.Features.Journal;
using Observatory.Framework;
using Observatory.Framework.Files.Journal;
public class JournalProcessor(
ILogger<JournalProcessor> logger,
IJournalService journalService,
IEventHubContext hub) : IJournalProcessor, IHostedService, IDisposable
{
private CancellationTokenSource tokenSource = new();
readonly JsonSerializerOptions options = new()
{
PropertyNameCaseInsensitive = true,
AllowOutOfOrderMetadataProperties = true,
Converters = { new JournalInvalidDoubleConverter(), new JournalInvalidFloatConverter() },
NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
};
public async Task HandleFileInner(string filePath, CancellationToken token = new())
{
logger.LogInformation("Processing journal file: '{File}'", filePath);
var file = await File.ReadAllBytesAsync(filePath, token);
var lines = file.Split(Encoding.UTF8.GetBytes(Environment.NewLine)).ToList();
var newJournals = new List<JournalBase>();
//await Parallel.ForEachAsync(lines, new ParallelOptions() { MaxDegreeOfParallelism = 32, TaskScheduler = TaskScheduler.Default, CancellationToken = token}, (line, _) =>
for (var index = 0; index < lines.Count; index++)
{
var line = lines[index];
if (line.Count == 0)
{
// return ValueTask.CompletedTask;
continue;
}
if (line.Contains("\"RotationPeriod\":inf"u8.ToArray()))
{
var newLine = line.Replace("\"RotationPeriod\":inf"u8, "\"RotationPeriod\":0"u8);
line = newLine;
}
try
{
var journal = JsonSerializer.Deserialize<JournalBase>(new ReadOnlySpan<byte>(line.ToArray()), options);
if (journal == null)
{
//return ValueTask.CompletedTask;
continue;
}
newJournals.Add(journal);
}
catch (JsonException ex)
{
logger.LogError(ex, "Error deserializing journal file: '{File}', line: {Line}", filePath, line);
}
//return ValueTask.CompletedTask;
}
if (newJournals.Any())
{
await hub.Clients.All.JournalUpdated(newJournals);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
tokenSource.Dispose();
tokenSource = new();
ProcessQueue();
return Task.CompletedTask;
}
private void ProcessQueue()
{
Task.Run(async () =>
{
while (!tokenSource.Token.IsCancellationRequested)
{
if (journalService.TryDequeue(out var file))
{
await HandleFileInner(file, tokenSource.Token);
}
else
{
await Task.Delay(1000);
}
}
}, tokenSource.Token);
}
public Task StopAsync(CancellationToken cancellationToken)
{
tokenSource?.Cancel();
return Task.CompletedTask;
}
public void Dispose()
{
tokenSource?.Dispose();
}
}
public interface IJournalProcessor
{
}

View File

@ -1,77 +1,42 @@
namespace Pulsar.Features.Journal;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using Observatory.Framework.Files.Journal;
public interface IJournalService : IJournalHandler<List<JournalBase>>;
public interface IJournalService : IJournalHandler<List<JournalBase>>
{
public bool TryDequeue(out string filePath);
}
public class JournalService(
ILogger<JournalService> logger,
IOptions<PulsarConfiguration> options,
IEventHubContext hub,
PulsarContext context,
IServiceProvider serviceProvider
ILogger<JournalService> logger
) : IJournalService
{
public string FileName => FileHandlerService.JournalLogFileName;
static ConcurrentBag<JournalBase> _journals = new();
private readonly ConcurrentQueue<string> JournalFileQueue = new();
static DateTimeOffset notBefore = DateTimeOffset.UtcNow.AddHours(-1);
readonly JsonSerializerOptions options = new()
public void EnqueueFile(string filePath)
{
PropertyNameCaseInsensitive = true,
AllowOutOfOrderMetadataProperties = true,
// Converters = { ActivatorUtilities.CreateInstance<JournalJsonConverter>(serviceProvider) }
};
JournalFileQueue.Enqueue(filePath);
}
public bool TryDequeue(out string filePath)
{
return JournalFileQueue.TryDequeue(out filePath);
}
public async Task HandleFile(string filePath)
public Task HandleFile(string filePath, CancellationToken token = new())
{
if (!FileHelper.ValidateFile(filePath))
{
return;
return Task.CompletedTask;
}
var file = await File.ReadAllLinesAsync(filePath, Encoding.UTF8);
var newJournals = new List<JournalBase>();
await Parallel.ForEachAsync(file, (line, _) =>
{
if (string.IsNullOrWhiteSpace(line))
{
return ValueTask.CompletedTask;
}
var journal = JsonSerializer.Deserialize<JournalBase>(line, options);
if (journal == null)
{
return ValueTask.CompletedTask;
}
if (_journals.Any(j => j.Timestamp == journal.Timestamp && j.GetType() == journal.GetType()))
{
return ValueTask.CompletedTask;
}
_journals.Add(journal);
if (journal.Timestamp < notBefore)
{
return ValueTask.CompletedTask;
}
newJournals.Add(journal);
return ValueTask.CompletedTask;
});
if (newJournals.Any())
{
await hub.Clients.All.JournalUpdated(newJournals);
}
EnqueueFile(filePath);
return Task.CompletedTask;
}
public async Task<List<JournalBase>> Get()
{
return [];