2
0
mirror of https://github.com/9ParsonsB/Pulsar.git synced 2025-07-01 08:23:42 -04:00

Update Journal File Handling

Now Correctly deserializes Journal events
This commit is contained in:
2024-05-12 12:55:28 +10:00
parent e59ca066ab
commit bd811c861c
29 changed files with 714 additions and 442 deletions

View File

@ -89,6 +89,6 @@ public class FileHandlerService(
}
logger.LogInformation("Handling file {FileName} with Type {Type}", fileName, handler.GetType().ToString());
Task.Run(() => handler.HandleFile(path));
await handler.HandleFile(path);
}
}

View File

@ -3,44 +3,67 @@ namespace Pulsar.Features;
using System.Collections.Concurrent;
using Microsoft.Extensions.FileProviders;
public class FileWatcherService(IOptions<PulsarConfiguration> options, IFileHandlerService fileHandlerService) : IHostedService
public class FileWatcherService(IOptions<PulsarConfiguration> options, IFileHandlerService fileHandlerService)
: IHostedService
{
private PhysicalFileProvider watcher = null!;
public Task StartAsync(CancellationToken cancellationToken)
{
if (!Directory.Exists(options.Value.JournalDirectory))
{
throw new Exception($"Directory {options.Value.JournalDirectory} does not exist.");
}
watcher = new PhysicalFileProvider(options.Value.JournalDirectory);
Watch();
// read the journal directory to get the initial files
#if DEBUG
Task.Run(() =>
{
Thread.Sleep(TimeSpan.FromSeconds(10));
HandleFileChanged();
}, cancellationToken);
#else
HandleFileChanged();
#endif
return Task.CompletedTask;
}
ConcurrentDictionary<string, DateTimeOffset> FileDates = new();
private void HandleFileChanged(object? sender)
private void HandleFileChanged(object? sender = null)
{
foreach (var file in watcher.GetDirectoryContents(""))
{
if (file.IsDirectory || !file.Name.EndsWith(".json") && !(file.Name.StartsWith(FileHandlerService.JournalLogFileNameStart) && file.Name.EndsWith(FileHandlerService.JournalLogFileNameEnd)))
if (file.IsDirectory || !file.Name.EndsWith(".json") &&
!(file.Name.StartsWith(FileHandlerService.JournalLogFileNameStart) &&
file.Name.EndsWith(FileHandlerService.JournalLogFileNameEnd)))
{
continue;
}
var existing = FileDates.GetOrAdd(file.PhysicalPath, file.LastModified);
if (existing != file.LastModified)
FileDates.AddOrUpdate(file.PhysicalPath, _ =>
{
fileHandlerService.HandleFile(file.PhysicalPath);
}
Task.Run(() => fileHandlerService.HandleFile(file.PhysicalPath));
return file.LastModified;
}, (_, existing) =>
{
if (existing != file.LastModified)
{
Task.Run(() => fileHandlerService.HandleFile(file.PhysicalPath));
}
return file.LastModified;
});
}
Watch();
}
private void Watch()
{
watcher.Watch("*.*").RegisterChangeCallback(HandleFileChanged, null);

View File

@ -1,12 +1,12 @@
namespace Pulsar.Features.Journal;
using System.Collections.Concurrent;
using System.Text.RegularExpressions;
using Observatory.Framework.Files.Journal;
public interface IJournalService : IJournalHandler<List<JournalBase>>;
public class JournalService
(
public class JournalService(
ILogger<JournalService> logger,
IOptions<PulsarConfiguration> options,
IEventHubContext hub,
@ -14,74 +14,52 @@ public class JournalService
) : IJournalService
{
public string FileName => FileHandlerService.JournalLogFileName;
static ConcurrentBag<JournalBase> _journals = new();
public Task HandleFile(string filePath) => HandleFile(filePath, CancellationToken.None);
public async Task HandleFile(string filePath, CancellationToken token)
static DateTimeOffset notBefore = DateTimeOffset.UtcNow.AddHours(-1);
public async Task HandleFile(string filePath)
{
if (!FileHelper.ValidateFile(filePath))
{
return;
}
var file = await File.ReadAllLinesAsync(filePath, Encoding.UTF8, token);
var journals = file.Select(line => JsonSerializer.Deserialize<JournalBase>(line)).ToList();
var file = await File.ReadAllLinesAsync(filePath, Encoding.UTF8);
var newJournals = new List<JournalBase>();
var notBefore = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromHours(6));
foreach (var journal in journals)
var select = file.AsParallel().Select(line => JsonSerializer.Deserialize<JournalBase>(line,
new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
Converters = { new JournalConverter(logger) }
}));
foreach (var journal in select)
{
if (context.Journals.Any(j => j.Timestamp == journal.Timestamp && j.Event == journal.Event))
if (_journals.Any(j => j.Timestamp == journal.Timestamp && j.Event == journal.Event))
{
continue;
}
if (journal.Timestamp < notBefore)
{
continue;
}
context.Journals.Add(journal);
if (journal.Timestamp > notBefore)
{
newJournals.Add(journal);
}
_journals.Add(journal);
newJournals.Add(journal);
}
await hub.Clients.All.JournalUpdated(newJournals);
if (newJournals.Any())
{
await hub.Clients.All.JournalUpdated(newJournals);
}
}
public async Task<List<JournalBase>> Get()
{
var folder = new DirectoryInfo(options.Value.JournalDirectory);
var regex = new Regex(FileHandlerService.JournalLogFileNameRegEx);
if (!folder.Exists)
{
logger.LogWarning("Journal directory {JournalDirectory} does not exist", folder.FullName);
return [];
}
var dataFileName = folder.GetFiles().FirstOrDefault(f => regex.IsMatch(f.Name))?.FullName;
if (!FileHelper.ValidateFile(dataFileName))
{
return [];
}
// Seems each entry is a new line. Not sure if this can be relied on?
var logs = File.ReadAllLines(dataFileName);
var journals = new List<JournalBase>();
foreach (var log in logs)
{
// var info = JournalReader.ObservatoryDeserializer<JournalBase>(log);
var info = JsonSerializer.Deserialize<JournalBase>(log);
if (info != null)
{
journals.Add(info);
}
}
if (journals.Count > 0) return journals;
logger.LogWarning("Failed to deserialize module info file {file}", dataFileName);
return [];
await hub.Clients.All.JournalUpdated(_journals.ToList());
return _journals.ToList();
}
}
}

View File

@ -13,31 +13,6 @@ public class StatusService
{
public string FileName => FileHandlerService.StatusFileName;
public bool ValidateFile(string filePath)
{
if (!File.Exists(filePath))
{
logger.LogWarning("Journal file {JournalFile} does not exist", filePath);
return false;
}
var fileInfo = new FileInfo(filePath);
if (!string.Equals(fileInfo.Name, FileName, StringComparison.InvariantCultureIgnoreCase))
{
logger.LogWarning("Journal file {name} is not valid");
return false;
}
if (fileInfo.Length == 0)
{
logger.LogWarning("Journal file {name} is empty", filePath);
return false;
}
return true;
}
public async Task HandleFile(string filePath)
{
if (!FileHelper.ValidateFile(filePath))
@ -46,6 +21,13 @@ public class StatusService
}
var file = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
if (file.Length < 2)
{
logger.LogWarning("File {FilePath} is empty", filePath);
return;
}
var status = await JsonSerializer.DeserializeAsync<Status>(file);
if (status == null)