Fix campaign steps
This commit is contained in:
@ -29,6 +29,8 @@ namespace Hcs.WebApp.BackgroundServices
|
||||
{
|
||||
while (state.TryDequeueOperation(out var operation))
|
||||
{
|
||||
state.SetProcessingOperation(operation);
|
||||
|
||||
if (stoppingToken.IsCancellationRequested) return;
|
||||
|
||||
var scope = scopeFactory.CreateScope();
|
||||
@ -49,6 +51,8 @@ namespace Hcs.WebApp.BackgroundServices
|
||||
{
|
||||
await headquartersService.SetOperationMessageGuidAsync(operation.Id, messageGuid);
|
||||
}
|
||||
|
||||
state.UnsetProcessingOperation(operation);
|
||||
}
|
||||
|
||||
await Task.Delay(SLEEP_TIME, stoppingToken);
|
||||
|
||||
@ -5,21 +5,32 @@ namespace Hcs.WebApp.BackgroundServices
|
||||
{
|
||||
public class OperationExecutionState
|
||||
{
|
||||
private readonly ConcurrentQueue<Operation> operations = new();
|
||||
private readonly ConcurrentQueue<Operation> operationsInQueue = new();
|
||||
private readonly HashSet<Operation> operationsInProcess = [];
|
||||
|
||||
public void EnqueueOperation(Operation operation)
|
||||
{
|
||||
operations.Enqueue(operation);
|
||||
operationsInQueue.Enqueue(operation);
|
||||
}
|
||||
|
||||
public bool TryDequeueOperation(out Operation operation)
|
||||
{
|
||||
return operations.TryDequeue(out operation);
|
||||
return operationsInQueue.TryDequeue(out operation);
|
||||
}
|
||||
|
||||
public void SetProcessingOperation(Operation operation)
|
||||
{
|
||||
operationsInProcess.Add(operation);
|
||||
}
|
||||
|
||||
public void UnsetProcessingOperation(Operation operation)
|
||||
{
|
||||
operationsInProcess.Remove(operation);
|
||||
}
|
||||
|
||||
public bool HasCampaignOperation(int campaignId)
|
||||
{
|
||||
return operations.Any(x => x.CampaignId == campaignId);
|
||||
return operationsInProcess.Any(x => x.CampaignId == campaignId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,6 +42,8 @@ namespace Hcs.WebApp.BackgroundServices
|
||||
{
|
||||
while (state.TryDequeueOperation(out var operation))
|
||||
{
|
||||
state.SetProcessingOperation(operation);
|
||||
|
||||
if (stoppingToken.IsCancellationRequested) return;
|
||||
|
||||
entries.Add(new (operation, new WaitState()));
|
||||
@ -86,6 +88,11 @@ namespace Hcs.WebApp.BackgroundServices
|
||||
entry.state.attempt++;
|
||||
entry.state.timer = 0;
|
||||
}
|
||||
|
||||
if (entry.state.done)
|
||||
{
|
||||
state.UnsetProcessingOperation(entry.operation);
|
||||
}
|
||||
}
|
||||
|
||||
entries.RemoveAll(x => x.state.done);
|
||||
|
||||
@ -5,21 +5,32 @@ namespace Hcs.WebApp.BackgroundServices
|
||||
{
|
||||
public class ResultWaitState
|
||||
{
|
||||
private readonly ConcurrentQueue<Operation> operations = new();
|
||||
private readonly ConcurrentQueue<Operation> operationsInQueue = new();
|
||||
private readonly HashSet<Operation> operationsInProcess = [];
|
||||
|
||||
public void EnqueueOperation(Operation operation)
|
||||
{
|
||||
operations.Enqueue(operation);
|
||||
operationsInQueue.Enqueue(operation);
|
||||
}
|
||||
|
||||
public bool TryDequeueOperation(out Operation operation)
|
||||
{
|
||||
return operations.TryDequeue(out operation);
|
||||
return operationsInQueue.TryDequeue(out operation);
|
||||
}
|
||||
|
||||
public void SetProcessingOperation(Operation operation)
|
||||
{
|
||||
operationsInProcess.Add(operation);
|
||||
}
|
||||
|
||||
public void UnsetProcessingOperation(Operation operation)
|
||||
{
|
||||
operationsInProcess.Remove(operation);
|
||||
}
|
||||
|
||||
public bool HasCampaignOperation(int campaignId)
|
||||
{
|
||||
return operations.Any(x => x.CampaignId == campaignId);
|
||||
return operationsInProcess.Any(x => x.CampaignId == campaignId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user