Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
137 changes: 119 additions & 18 deletions 137 common/src/main/kotlin/com/lambda/event/EventFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@ import com.lambda.event.callback.ICancellable
import com.lambda.event.listener.Listener
import com.lambda.threading.runConcurrent
import com.lambda.threading.runSafe
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout


/**
Expand All @@ -28,12 +39,35 @@ object EventFlow {
* useful when you have multiple independent [Job]s running in parallel.
*/
val lambdaScope = CoroutineScope(Dispatchers.Default + SupervisorJob())

/**
* [concurrentFlow] is a [MutableSharedFlow] of [Event]s with a buffer capacity to handle event emissions.
*
* Events emitted to this flow are processed by concurrent listeners, allowing for parallel event handling.
*
* The buffer overflow strategy is set to [BufferOverflow.DROP_OLDEST], meaning that when the buffer is full,
* the oldest event will be dropped to accommodate a new event.
*/
val concurrentFlow = MutableSharedFlow<Event>(
extraBufferCapacity = 1000,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)

/**
* [syncListeners] is a [Subscriber] that manages synchronous listeners.
*
* These listeners will be executed immediately when an event is posted, allowing for immediate responses to events.
* The [syncListeners] are stored in a [Subscriber] object, which is a specialized [ConcurrentHashMap] that manages sets of [Listener]s for different [Event] types.
*/
val syncListeners = Subscriber()

/**
* [concurrentListeners] is a [Subscriber] that manages asynchronous listeners.
*
* These listeners will be executed in parallel, each on a dedicated coroutine,
* allowing for concurrent processing of events.
* The [concurrentListeners] are stored in a [Subscriber] object, which is a specialized [ConcurrentHashMap] that manages sets of [Listener]s for different [Event] types.
*/
val concurrentListeners = Subscriber()

init {
Expand All @@ -49,28 +83,60 @@ object EventFlow {
}
}

suspend inline fun <reified E : Event> awaitEvent(
/**
* Suspends until an event of type [E] is received that satisfies the given [predicate].
*
* @param E The type of the event to wait for. This should be a subclass of [Event].
* @param predicate A lambda to test if the event satisfies the condition.
* @return The first event that matches the predicate.
*/
suspend inline fun <reified E : Event> blockUntilEvent(
noinline predicate: SafeContext.(E) -> Boolean = { true },
) = concurrentFlow.filterIsInstance<E>().first {
runSafe {
predicate(it)
} ?: false
}

suspend inline fun <reified E : Event> awaitEventUnsafe(
noinline predicate: (E) -> Boolean = { true },
) = concurrentFlow.filterIsInstance<E>().first(predicate)

suspend inline fun <reified E : Event> awaitEvent(
/**
* Suspends until an event of type [E] is received that satisfies the given [predicate],
* or until the specified [timeout] occurs.
*
* @param E The type of the event to wait for. This should be a subclass of [Event].
* @param timeout The maximum time to wait for the event, in milliseconds.
* @param predicate A lambda to test if the event satisfies the condition.
* @return The first event that matches the predicate or throws a timeout exception if not found.
*/
suspend inline fun <reified E : Event> blockUntilEvent(
timeout: Long,
noinline predicate: (E) -> Boolean = { true },
) = runBlocking {
withTimeout(timeout) {
concurrentFlow.filterIsInstance<E>().first(predicate)
}
withTimeout(timeout) {
concurrentFlow.filterIsInstance<E>().first(predicate)
}
}

suspend inline fun <reified E : Event> awaitEvents(
/**
* Suspends until an event of type [E] is received that satisfies the given [predicate].
*
* This method is "unsafe" in the sense that it does not execute the predicate within a [SafeContext].
*
* @param E The type of the event to wait for. This should be a subclass of [Event].
* @param predicate A lambda to test if the event satisfies the condition.
* @return The first event that matches the predicate.
*/
suspend inline fun <reified E : Event> blockUntilUnsafeEvent(
noinline predicate: (E) -> Boolean = { true },
) = concurrentFlow.filterIsInstance<E>().first(predicate)

/**
* Returns a [Flow] of events of type [E] that satisfy the given [predicate].
*
* @param E The type of the event to filter. This should be a subclass of [Event].
* @param predicate A lambda to test if the event satisfies the condition.
* @return A [Flow] emitting events that match the predicate.
*/
suspend inline fun <reified E : Event> collectEvents(
crossinline predicate: (E) -> Boolean = { true },
): Flow<E> = flow {
concurrentFlow
Expand Down Expand Up @@ -149,23 +215,58 @@ object EventFlow {
concurrentListeners.remove(T::class)
}

private fun Event.executeListenerSynchronous() {
syncListeners[this::class]?.forEach { listener ->
/**
* Executes the listeners for the current event type synchronously.
*
* This method retrieves the list of synchronous listeners for the event's class
* and invokes their [Listener.execute] method if the listener should be notified.
*
* @receiver The current event for which listeners are to be executed.
* @param T The type of the event being handled.
*/
private fun <T : Event> T.executeListenerSynchronous() {
syncListeners[this::class]?.forEach {
@Suppress("UNCHECKED_CAST")
val listener = it as? Listener<T> ?: return@forEach
if (shouldNotNotify(listener, this)) return@forEach
listener.execute(this)
}
}

private fun Event.executeListenerConcurrently() {
concurrentListeners[this::class]?.forEach { listener ->
/**
* Executes the listeners for the current event type concurrently.
*
* This method retrieves the list of concurrent listeners for the event's class
* and invokes their [Listener.execute] method if the listener should be notified.
* Each listener is executed on the same coroutine scope.
*
* @receiver The current event for which listeners are to be executed.
* @param T The type of the event being handled.
*/
private fun <T : Event> T.executeListenerConcurrently() {
concurrentListeners[this::class]?.forEach {
@Suppress("UNCHECKED_CAST")
val listener = it as? Listener<T> ?: return@forEach
if (shouldNotNotify(listener, this)) return@forEach
listener.execute(this)
}
}

private fun shouldNotNotify(listener: Listener, event: Event) =
/**
* Determines whether a given [listener] should be notified about an [event].
*
* A listener should not be notified if:
* - The listener's owner is a [Muteable] and is currently muted, unless the listener is set to [alwaysListen].
* - The event is cancellable and has been canceled.
*
* @param listener The listener to check.
* @param event The event being processed.
* @param T The type of the event.
* @return `true` if the listener should not be notified, `false` otherwise.
*/
private fun <T : Event> shouldNotNotify(listener: Listener<T>, event: Event) =
listener.owner is Muteable
&& (listener.owner as Muteable).isMuted
&& !listener.alwaysListen
|| event is ICancellable && event.isCanceled()
}
}
34 changes: 15 additions & 19 deletions 34 common/src/main/kotlin/com/lambda/event/Subscriber.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,34 @@ import kotlin.reflect.KClass
*
* @property defaultListenerSet A [ConcurrentSkipListSet] of [Listener]s, sorted in reverse order.
*/
class Subscriber : ConcurrentHashMap<KClass<*>, ConcurrentSkipListSet<Listener>>() {
val defaultListenerSet: ConcurrentSkipListSet<Listener>
get() = ConcurrentSkipListSet(Comparator.reverseOrder())
class Subscriber : ConcurrentHashMap<KClass<out Event>, ConcurrentSkipListSet<Listener<out Event>>>() {
val defaultListenerSet: ConcurrentSkipListSet<Listener<out Event>>
get() = ConcurrentSkipListSet(Listener.comparator.reversed())


/** Allows a [Listener] to start receiving a specific type of [Event] */
inline fun <reified T : Event> subscribe(listener: Listener) =
inline fun <reified T : Event> subscribe(listener: Listener<T>) =
getOrPut(T::class) { defaultListenerSet }.add(listener)


/** Forgets about every [Listener]s association to [eventType] */
fun unsubscribe(eventType: KClass<*>) = remove(eventType)

/** Allows a [Listener] to stop receiving a specific type of [Event] */
fun unsubscribe(listener: Listener) {
values.forEach { listeners ->
listeners.remove(listener)
}
}

/** Allows a [Subscriber] to start receiving all [Event]s of another [Subscriber]. */
infix fun subscribe(subscriber: Subscriber) {
subscriber.forEach { (eventType, listeners) ->
getOrPut(eventType) { defaultListenerSet }.addAll(listeners)
}
}

/** Forgets about every [Listener]'s association to [eventType] */
fun <T : Event> unsubscribe(eventType: KClass<T>) =
remove(eventType)

/** Allows a [Listener] to stop receiving a specific type of [Event] */
inline fun <reified T : Event> unsubscribe(listener: Listener<T>) =
getOrElse(T::class) { defaultListenerSet }.remove(listener)

/** Allows a [Subscriber] to stop receiving all [Event]s of another [Subscriber] */
infix fun unsubscribe(subscriber: Subscriber) {
entries.removeAll { (eventType, listeners) ->
subscriber[eventType]?.let { listeners.removeAll(it) }
listeners.isEmpty()
subscriber.forEach { (eventType, listeners) ->
getOrElse(eventType) { defaultListenerSet }.removeAll(listeners)
}
}
}
}
26 changes: 11 additions & 15 deletions 26 common/src/main/kotlin/com/lambda/event/listener/Listener.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.lambda.module.Module
* @property owner The owner of the [Listener]. This is typically the object that created the [Listener].
* @property alwaysListen If true, the [Listener] will always be triggered, even if the [owner] is [Muteable.isMuted].
*/
abstract class Listener : Comparable<Listener> {
abstract class Listener<T : Event> : Comparable<Listener<T>> {
abstract val priority: Int
abstract val owner: Any
abstract val alwaysListen: Boolean
Expand All @@ -37,21 +37,17 @@ abstract class Listener : Comparable<Listener> {
*
* @param event The event that triggered this listener.
*/
abstract fun execute(event: Event)
abstract fun execute(event: T)

/**
* Compares this listener with another listener.
* The comparison is based first on the priority, and then on the hash code of the listeners.
*
* @param other The other listener to compare with.
* @return A negative integer, zero, or a positive integer as this listener is less than, equal to,
* or greater than the specified listener.
*/
override fun compareTo(other: Listener) =
compareBy<Listener> {
override fun compareTo(other: Listener<T>) =
comparator.compare(this, other)

companion object {
val comparator = compareBy<Listener<out Event>> {
it.priority
}.thenBy {
// Needed because ConcurrentSkipListSet handles insertion based on compareTo
// Hashcode is needed because ConcurrentSkipListSet handles insertion based on compareTo
it.hashCode()
}.compare(this, other)
}
}
}
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.