I am losing my mind. Look at this code. Using concurrent queue
public void Save(T model)
{
_queue.Enqueue(model);
Debug.WriteLine("Enqueue " + model.ToString()); // <-- this changes everything!!!
StartProcess();
}
public void Flush()
{
Task? t = StartProcess();
if (t != null)
t.Wait();
}
private Task? StartProcess()
{
Task? t = null;
if (!_isSaveActivated)
{
lock (_lock) // ensure only one persisting loop running
{
if (!_isSaveActivated)
{
_isSaveActivated = true;
t = Task.Run(() => ExecuteProcess());
}
}
}
return t;
}
private void ExecuteProcess()
{
// this will run until queue is empty
// but lets break by 10 not to delay too much
int count = 0;
List<T> saveList = new List<T>();
try
{
while (_queue.TryDequeue(out T? item))
{
count++;
saveList.Add(item);
// if behind picked item there is empty queue, we should save as many items as there are
if (count == 10 || !_queue.TryPeek(out T? nextItem))
{
Save(saveList);
// reset values for the next 10-item loop
count = 0;
saveList.Clear();
}
}
}
catch (Exception ex)
{
// trace
}
finally
{
// unblock the next potential thread
_isSaveActivated = false;
}
void Save(IEnumerable<T> persistList) // local
{
_dbContextProxy.AddRange(saveList);
_dbContextProxy.Save();
}
}
And in the unit test, a simple loop
for (int i = 1; i <= totalCount; i++)
persister.Save(new TestModel1() { Id = i, Desc = "Item " + i });
Assert.AreEqual(633, proxy.TototalCount, "Total count should be {0}", 633);
Here is the freak show. I have the collector, every time _dbContextProxy.Save();
runs, total processed items is added to. When this line is commented out // Debug.WriteLine("Enqueue " + model.ToString()); <-- this changes everything!!!
my totals is 0 . But when I have that line uncommented - it works!! Both, RUN and Debug. And I did it like 20 times already - commented, uncommented and recompiled in between. And this behavior reproducible each time.
One thing I must add that this whole code being run from Task.Run(...)
somewhere down the stack.
What do you see? Help appreciated