mirror of
https://github.com/9ParsonsB/Pulsar.git
synced 2025-07-01 16:33:43 -04:00
Add Startup Events to Database
Now emit startup events on conneciton Some events still to add
This commit is contained in:
@ -1,11 +1,19 @@
|
||||
using Pulsar.Features.Journal;
|
||||
|
||||
namespace Pulsar.Features;
|
||||
|
||||
using Observatory.Framework.Files;
|
||||
using Observatory.Framework.Files.Journal;
|
||||
using Observatory.Framework.Files.Journal.Odyssey;
|
||||
|
||||
public class EventsHub : Hub<IEventsHub>
|
||||
public class EventsHub(IJournalService journalService) : Hub<IEventsHub>
|
||||
{
|
||||
public override async Task OnConnectedAsync()
|
||||
{
|
||||
await base.OnConnectedAsync();
|
||||
await Clients.Caller.JournalUpdated(await journalService.GetLastStartupEvents());
|
||||
}
|
||||
|
||||
public async Task Status([FromServices] IStatusService statusService)
|
||||
{
|
||||
var status = await statusService.Get();
|
||||
|
@ -19,12 +19,7 @@ public class FileWatcherService(IOptions<PulsarConfiguration> options, IFileHand
|
||||
Watch(cancellationToken);
|
||||
|
||||
// read the journal directory to get the initial files
|
||||
#if DEBUG
|
||||
Thread.Sleep(TimeSpan.FromSeconds(2));
|
||||
HandleFileChanged(cancellationToken);
|
||||
#else
|
||||
HandleFileChanged(cancellationToken);
|
||||
#endif
|
||||
|
||||
|
||||
return Task.CompletedTask;
|
||||
|
@ -1,18 +1,19 @@
|
||||
using Observatory.Framework.Files.Journal.Startup;
|
||||
using Observatory.Framework.Files.Journal.StationServices;
|
||||
|
||||
namespace Pulsar.Features.Journal;
|
||||
|
||||
using Observatory.Framework;
|
||||
using Observatory.Framework.Files.Journal;
|
||||
using Observatory.Framework.Files.Journal.Startup;
|
||||
|
||||
public class JournalProcessor(
|
||||
ILogger<JournalProcessor> logger,
|
||||
IJournalService journalService,
|
||||
IJournalStore journalStore,
|
||||
PulsarContext context,
|
||||
IEventHubContext hub) : IHostedService, IDisposable
|
||||
{
|
||||
private CancellationTokenSource tokenSource = new();
|
||||
|
||||
|
||||
readonly JsonSerializerOptions options = new()
|
||||
{
|
||||
PropertyNameCaseInsensitive = true,
|
||||
@ -52,24 +53,74 @@ public class JournalProcessor(
|
||||
continue;
|
||||
}
|
||||
|
||||
if (journal is LoadGame loadGame)
|
||||
switch (journal)
|
||||
{
|
||||
// if not existing, add
|
||||
if (context.LoadGames.Any(l => l.Timestamp == loadGame.Timestamp))
|
||||
{
|
||||
//return ValueTask.CompletedTask;
|
||||
case Commander commander when context.Commander.Any(c => c.Timestamp == commander.Timestamp):
|
||||
continue;
|
||||
}
|
||||
await context.LoadGames.AddAsync(loadGame, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
case Commander commander:
|
||||
await context.Commander.AddAsync(commander, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
case Materials materials when context.Materials.Any(m => m.Timestamp == materials.Timestamp):
|
||||
continue;
|
||||
case Materials materials:
|
||||
await context.Materials.AddAsync(materials, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
case Rank rank when context.Rank.Any(r => r.Timestamp == rank.Timestamp):
|
||||
continue;
|
||||
case Rank rank:
|
||||
await context.Rank.AddAsync(rank, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
case Progress progress when context.Progress.Any(p => p.Timestamp == progress.Timestamp):
|
||||
continue;
|
||||
case Progress progress:
|
||||
await context.Progress.AddAsync(progress, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
case Reputation reputation when context.Reputation.Any(r => r.Timestamp == reputation.Timestamp):
|
||||
continue;
|
||||
case Reputation reputation:
|
||||
await context.Reputation.AddAsync(reputation, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
|
||||
case EngineerProgress engineerProgress
|
||||
when context.EngineerProgress.Any(e => e.Timestamp == engineerProgress.Timestamp):
|
||||
continue;
|
||||
case EngineerProgress engineerProgress:
|
||||
await context.EngineerProgress.AddAsync(engineerProgress, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
case LoadGame loadGame when context.LoadGames.Any(l => l.Timestamp == loadGame.Timestamp):
|
||||
continue;
|
||||
case LoadGame loadGame:
|
||||
await context.LoadGames.AddAsync(loadGame, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
|
||||
|
||||
case Statistics statistics when context.Statistics.Any(s => s.Timestamp == statistics.Timestamp):
|
||||
continue;
|
||||
case Statistics statistics:
|
||||
await context.Statistics.AddAsync(statistics, token);
|
||||
await context.SaveChangesAsync(token);
|
||||
break;
|
||||
|
||||
}
|
||||
|
||||
newJournals.Add(journal);
|
||||
}
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
logger.LogError(ex, "Error deserializing journal file: '{File}', line: {Line}", filePath, line);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error processing journal file: '{File}', line# {LineNumber}, line: {Line}",
|
||||
filePath, index, Encoding.UTF8.GetString(line.ToArray()));
|
||||
}
|
||||
|
||||
//return ValueTask.CompletedTask;
|
||||
}
|
||||
@ -95,7 +146,7 @@ public class JournalProcessor(
|
||||
{
|
||||
try
|
||||
{
|
||||
if (journalService.TryDequeue(out var file))
|
||||
if (journalStore.TryDequeue(out var file))
|
||||
{
|
||||
handled.AddRange(await HandleFileInner(file, tokenSource.Token));
|
||||
}
|
||||
@ -108,6 +159,7 @@ public class JournalProcessor(
|
||||
{
|
||||
handled = handled.Where(j => j.Timestamp > lastLoadGame.Timestamp).ToList();
|
||||
}
|
||||
|
||||
await hub.Clients.All.JournalUpdated(handled);
|
||||
handled.Clear();
|
||||
}
|
||||
@ -134,4 +186,4 @@ public class JournalProcessor(
|
||||
{
|
||||
tokenSource?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,31 +1,24 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Observatory.Framework.Files.Journal.Startup;
|
||||
using Observatory.Framework.Files.Journal.StationServices;
|
||||
|
||||
namespace Pulsar.Features.Journal;
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
using Observatory.Framework.Files.Journal;
|
||||
|
||||
public interface IJournalService : IJournalHandler<List<JournalBase>>
|
||||
{
|
||||
public bool TryDequeue(out string filePath);
|
||||
Task<List<JournalBase>> GetLastStartupEvents();
|
||||
}
|
||||
|
||||
public class JournalService(
|
||||
ILogger<JournalService> logger
|
||||
ILogger<JournalService> logger,
|
||||
IJournalStore store,
|
||||
PulsarContext context
|
||||
) : IJournalService
|
||||
{
|
||||
public string FileName => FileHandlerService.JournalLogFileName;
|
||||
|
||||
private readonly ConcurrentQueue<string> JournalFileQueue = new();
|
||||
|
||||
public void EnqueueFile(string filePath)
|
||||
{
|
||||
JournalFileQueue.Enqueue(filePath);
|
||||
}
|
||||
|
||||
public bool TryDequeue(out string filePath)
|
||||
{
|
||||
return JournalFileQueue.TryDequeue(out filePath);
|
||||
}
|
||||
|
||||
public Task HandleFile(string filePath, CancellationToken token = new())
|
||||
{
|
||||
if (!FileHelper.ValidateFile(filePath))
|
||||
@ -33,10 +26,102 @@ public class JournalService(
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
EnqueueFile(filePath);
|
||||
store.EnqueueFile(filePath);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
// Start of game events/order:
|
||||
/** Commander
|
||||
* Materials
|
||||
Rank
|
||||
Progress
|
||||
Reputation
|
||||
EngineerProgress
|
||||
LoadGame
|
||||
--Some time later--
|
||||
Statistics
|
||||
-- Game Events (e.g. FSSSignalDiscovered) --
|
||||
Location
|
||||
Powerplay
|
||||
ShipLocker
|
||||
Missions
|
||||
Loadout
|
||||
Cargo
|
||||
*/
|
||||
|
||||
// StartupEvents:
|
||||
// Commander
|
||||
// Materials
|
||||
// Rank
|
||||
// Progress
|
||||
// Reputation
|
||||
// EngineerProgress
|
||||
// LoadGame
|
||||
|
||||
// StateEvents:
|
||||
// Location
|
||||
// Powerplay
|
||||
// Music
|
||||
// ShipLocker
|
||||
// Missions
|
||||
// Loadout
|
||||
// Cargo
|
||||
public async Task<List<JournalBase>> GetLastStartupEvents()
|
||||
{
|
||||
//TODO: add other state events
|
||||
var commanderTask = context.Commander.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var materialsTask = context.Materials.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var rankTask = context.Rank.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var progressTask = context.Progress.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var reputationTask = context.Reputation.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var engineerProgressTask = context.EngineerProgress.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var loadGameTask = context.LoadGames.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
var statisticsTask = context.Statistics.OrderByDescending(x => x.Timestamp).FirstOrDefaultAsync();
|
||||
|
||||
await Task.WhenAll(
|
||||
commanderTask,
|
||||
materialsTask,
|
||||
rankTask,
|
||||
progressTask,
|
||||
reputationTask,
|
||||
engineerProgressTask,
|
||||
loadGameTask,
|
||||
statisticsTask);
|
||||
|
||||
var commander = await commanderTask;
|
||||
var materials = await materialsTask;
|
||||
var rank = await rankTask;
|
||||
var progress = await progressTask;
|
||||
var reputation = await reputationTask;
|
||||
var engineerProgress = await engineerProgressTask;
|
||||
var loadGame = await loadGameTask;
|
||||
var statistics = await statisticsTask;
|
||||
|
||||
// if any null, return empty list
|
||||
if (materials == null || rank == null || progress == null || reputation == null || engineerProgress == null ||
|
||||
loadGame == null || statistics == null || commander == null)
|
||||
{
|
||||
return [];
|
||||
}
|
||||
|
||||
// dont check the time of statistics as it may occur a few moments after
|
||||
if (commander.Timestamp != materials.Timestamp ||
|
||||
materials.Timestamp != rank.Timestamp ||
|
||||
rank.Timestamp != progress.Timestamp ||
|
||||
progress.Timestamp != reputation.Timestamp ||
|
||||
reputation.Timestamp != engineerProgress.Timestamp ||
|
||||
engineerProgress.Timestamp != loadGame.Timestamp ||
|
||||
statistics.Timestamp < materials.Timestamp)
|
||||
{
|
||||
throw new InvalidOperationException("Timestamps do not match");
|
||||
}
|
||||
|
||||
return [commander, materials, rank, progress, reputation, engineerProgress, loadGame, statistics];
|
||||
}
|
||||
|
||||
public async Task<List<JournalBase>> Get()
|
||||
{
|
||||
return [];
|
||||
|
24
Pulsar/Features/Journal/JournalStore.cs
Normal file
24
Pulsar/Features/Journal/JournalStore.cs
Normal file
@ -0,0 +1,24 @@
|
||||
namespace Pulsar.Features.Journal;
|
||||
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
public interface IJournalStore
|
||||
{
|
||||
void EnqueueFile(string filePath);
|
||||
bool TryDequeue(out string filePath);
|
||||
}
|
||||
|
||||
public class JournalStore : IJournalStore
|
||||
{
|
||||
private readonly ConcurrentQueue<string> JournalFileQueue = new();
|
||||
|
||||
public void EnqueueFile(string filePath)
|
||||
{
|
||||
JournalFileQueue.Enqueue(filePath);
|
||||
}
|
||||
|
||||
public bool TryDequeue(out string filePath)
|
||||
{
|
||||
return JournalFileQueue.TryDequeue(out filePath);
|
||||
}
|
||||
}
|
@ -4,7 +4,11 @@ using Observatory.Framework.Files;
|
||||
|
||||
public interface INavRouteService : IJournalHandler<NavRouteFile>;
|
||||
|
||||
public class NavRouteService(IOptions<PulsarConfiguration> options, ILogger<NavRouteService> logger, IEventHubContext hub) : INavRouteService
|
||||
public class NavRouteService(
|
||||
IOptions<PulsarConfiguration> options,
|
||||
ILogger<NavRouteService> logger,
|
||||
IEventHubContext hub)
|
||||
: INavRouteService
|
||||
{
|
||||
public async Task<NavRouteFile> Get()
|
||||
{
|
||||
|
@ -4,7 +4,11 @@ using Observatory.Framework.Files;
|
||||
|
||||
public interface IOutfittingService : IJournalHandler<OutfittingFile>;
|
||||
|
||||
public class OutfittingService(IOptions<PulsarConfiguration> options, IEventHubContext hub, ILogger<OutfittingService> logger) : IOutfittingService
|
||||
public class OutfittingService(
|
||||
IOptions<PulsarConfiguration> options,
|
||||
IEventHubContext hub,
|
||||
ILogger<OutfittingService> logger)
|
||||
: IOutfittingService
|
||||
{
|
||||
public async Task<OutfittingFile> Get()
|
||||
{
|
||||
|
@ -4,7 +4,9 @@ using Observatory.Framework.Files.Journal.Odyssey;
|
||||
|
||||
public interface IShipLockerService : IJournalHandler<ShipLockerMaterials>;
|
||||
|
||||
public class ShipLockerService(ILogger<ShipLockerService> logger, IOptions<PulsarConfiguration> options,
|
||||
public class ShipLockerService(
|
||||
ILogger<ShipLockerService> logger,
|
||||
IOptions<PulsarConfiguration> options,
|
||||
IEventHubContext hub)
|
||||
: IShipLockerService
|
||||
{
|
||||
|
Reference in New Issue
Block a user