Skip to content

Publish Queued Messages

var iHandleTypes = queuedMessages.Select(i => i.Key).Distinct().ToArray();

// the message-type and the receiver weak-reference
KeyValuePair<Type, WeakReference>[] toNotify;
lock (subscribers)
{
    // filter messages according to the given receiver
    toNotify = (from s in subscribers.Where(i => i.IsAlive && i.Target == receiver)
        let types = s.Target.GetType().GetInterfaces()
            .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof (IHandle<>))
            .ToArray()
    let qTypes = iHandleTypes.Where(i => types.Any(i.IsAssignableFrom)).ToArray()
    //let qType = iHandleTypes.FirstOrDefault(i => types.Any(i.IsAssignableFrom))
    select new {qTypes, s})
    .Where(i => i.qTypes.Length > 0)
    .SelectMany(i => i.qTypes.Select(j => new KeyValuePair<Type, WeakReference>(j, i.s)))
    .ToArray();
}

foreach (var recItem in toNotify)
{
    if (recItem.Value.IsAlive == false)
    {
        lock (subscribers)
        {
            subscribers.Remove(recItem.Value);
            continue;
        }
    }
    List<WeakReference> weakList;
    if (this.queuedMessages.TryGetValue(recItem.Key, out weakList))
    {
        if (weakList.Count == 0)
        {
            this.queuedMessages.TryRemove(recItem.Key, out weakList);
            continue;
        }

        PerformPublishQueuedMessage(recItem, weakList);

        if (weakList.Count == 0)
        {
            this.queuedMessages.TryRemove(recItem.Key, out weakList);
            continue;
        }

    }
}

// ..

/// <summary>
/// publish, countdown, dispose messages
/// </summary>
/// <param name="receiver"></param>
/// <param name="weakMsg"></param>
private void PerformPublishQueuedMessage(KeyValuePair<Type, WeakReference> receiver, List<WeakReference> weakMsg)
{
    foreach (var qMsgRec in weakMsg.ToArray())
    {
        //queuedMessages.TryGetValue()
        if (!receiver.Value.IsAlive || !qMsgRec.IsAlive)
        {
            if (qMsgRec.IsAlive == false)
                weakMsg.Remove(qMsgRec);
            break;
        }

        var qMsg = (IQueued)qMsgRec.Target;
        qMsg.MaxTimes--;
        receiver.Value.Target.GetType().GetMethod("Handle", new[] { qMsgRec.Target.GetType() })
            //.MakeGenericMethod(receiver.Key)
            .Invoke(receiver.Value.Target, new[] { qMsgRec.Target });

        if (qMsg.MaxTimes < 1)
        {
            weakMsg.Remove(qMsgRec);
// ReSharper disable once SuspiciousTypeConversion.Global
            if (qMsg is IDisposable dMsg)
                dMsg.Dispose();
        }
    }
}

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert