You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
397 lines
8.1 KiB
397 lines
8.1 KiB
11 years ago
|
package mightypork.util.control.eventbus;
|
||
11 years ago
|
|
||
11 years ago
|
|
||
11 years ago
|
import java.util.Collection;
|
||
11 years ago
|
import java.util.concurrent.DelayQueue;
|
||
|
import java.util.concurrent.Delayed;
|
||
|
import java.util.concurrent.TimeUnit;
|
||
11 years ago
|
|
||
11 years ago
|
import mightypork.util.annotations.FactoryMethod;
|
||
|
import mightypork.util.control.Destroyable;
|
||
|
import mightypork.util.control.eventbus.clients.DelegatingClient;
|
||
|
import mightypork.util.control.eventbus.events.Event;
|
||
|
import mightypork.util.control.eventbus.events.flags.DelayedEvent;
|
||
|
import mightypork.util.control.eventbus.events.flags.ImmediateEvent;
|
||
|
import mightypork.util.control.eventbus.events.flags.SingleReceiverEvent;
|
||
|
import mightypork.util.control.eventbus.events.flags.UnloggedEvent;
|
||
|
import mightypork.util.logging.Log;
|
||
11 years ago
|
|
||
|
|
||
|
/**
|
||
11 years ago
|
* An event bus, accommodating multiple {@link EventChannel}s.
|
||
11 years ago
|
*
|
||
|
* @author MightyPork
|
||
|
*/
|
||
11 years ago
|
final public class EventBus implements Destroyable {
|
||
11 years ago
|
|
||
11 years ago
|
/** Message channels */
|
||
11 years ago
|
private final BufferedHashSet<EventChannel<?, ?>> channels = new BufferedHashSet<>();
|
||
11 years ago
|
|
||
|
/** Registered clients */
|
||
11 years ago
|
private final BufferedHashSet<Object> clients = new BufferedHashSet<>();
|
||
11 years ago
|
|
||
|
/** Messages queued for delivery */
|
||
11 years ago
|
private final DelayQueue<DelayQueueEntry> sendQueue = new DelayQueue<>();
|
||
11 years ago
|
|
||
11 years ago
|
/** Queue polling thread */
|
||
11 years ago
|
private final QueuePollingThread busThread;
|
||
11 years ago
|
|
||
11 years ago
|
/** Whether the bus was destroyed */
|
||
11 years ago
|
private boolean dead = false;
|
||
11 years ago
|
|
||
11 years ago
|
/** Log detailed messages (debug) */
|
||
11 years ago
|
public boolean detailedLogging = false;
|
||
11 years ago
|
|
||
11 years ago
|
|
||
11 years ago
|
/**
|
||
|
* Make a new bus and start it's queue thread.
|
||
|
*/
|
||
11 years ago
|
public EventBus() {
|
||
11 years ago
|
busThread = new QueuePollingThread();
|
||
11 years ago
|
busThread.setDaemon(true);
|
||
11 years ago
|
busThread.start();
|
||
11 years ago
|
}
|
||
|
|
||
11 years ago
|
|
||
|
private boolean shallLog(Event<?> event)
|
||
|
{
|
||
11 years ago
|
if (!detailedLogging) return false;
|
||
11 years ago
|
if (event.getClass().isAnnotationPresent(UnloggedEvent.class)) return false;
|
||
11 years ago
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
11 years ago
|
|
||
11 years ago
|
/**
|
||
11 years ago
|
* Add a {@link EventChannel} to this bus.<br>
|
||
11 years ago
|
* If a channel of matching types is already added, it is returned instead.
|
||
|
*
|
||
11 years ago
|
* @param channel channel to be added
|
||
11 years ago
|
* @return the channel that's now in the bus
|
||
|
*/
|
||
11 years ago
|
public EventChannel<?, ?> addChannel(EventChannel<?, ?> channel)
|
||
11 years ago
|
{
|
||
11 years ago
|
assertLive();
|
||
|
|
||
11 years ago
|
// if the channel already exists, return this instance instead.
|
||
11 years ago
|
for (final EventChannel<?, ?> ch : channels) {
|
||
11 years ago
|
if (ch.equals(channel)) {
|
||
11 years ago
|
Log.w("<bus> Channel of type " + Log.str(channel) + " already registered.");
|
||
11 years ago
|
return ch;
|
||
|
}
|
||
|
}
|
||
11 years ago
|
|
||
11 years ago
|
channels.add(channel);
|
||
11 years ago
|
|
||
11 years ago
|
return channel;
|
||
|
}
|
||
11 years ago
|
|
||
|
|
||
11 years ago
|
/**
|
||
11 years ago
|
* Make & connect a channel for given event and client type.
|
||
11 years ago
|
*
|
||
11 years ago
|
* @param eventClass event type
|
||
11 years ago
|
* @param clientClass client type
|
||
|
* @return the created channel instance
|
||
|
*/
|
||
11 years ago
|
@FactoryMethod
|
||
11 years ago
|
public <F_EVENT extends Event<F_CLIENT>, F_CLIENT> EventChannel<?, ?> addChannel(Class<F_EVENT> eventClass, Class<F_CLIENT> clientClass)
|
||
11 years ago
|
{
|
||
|
assertLive();
|
||
|
|
||
11 years ago
|
final EventChannel<F_EVENT, F_CLIENT> channel = EventChannel.create(eventClass, clientClass);
|
||
11 years ago
|
return addChannel(channel);
|
||
|
}
|
||
|
|
||
|
|
||
11 years ago
|
/**
|
||
11 years ago
|
* Remove a {@link EventChannel} from this bus
|
||
11 years ago
|
*
|
||
11 years ago
|
* @param channel true if channel was removed
|
||
11 years ago
|
*/
|
||
11 years ago
|
public void removeChannel(EventChannel<?, ?> channel)
|
||
11 years ago
|
{
|
||
11 years ago
|
assertLive();
|
||
|
|
||
11 years ago
|
channels.remove(channel);
|
||
|
}
|
||
11 years ago
|
|
||
|
|
||
11 years ago
|
/**
|
||
11 years ago
|
* Send based on annotation.clients
|
||
11 years ago
|
*
|
||
11 years ago
|
* @param event event
|
||
11 years ago
|
*/
|
||
11 years ago
|
public void send(Event<?> event)
|
||
11 years ago
|
{
|
||
11 years ago
|
assertLive();
|
||
11 years ago
|
|
||
11 years ago
|
final DelayedEvent adelay = event.getClass().getAnnotation(DelayedEvent.class);
|
||
11 years ago
|
if (adelay != null) {
|
||
|
sendDelayed(event, adelay.delay());
|
||
11 years ago
|
return;
|
||
|
}
|
||
|
|
||
11 years ago
|
if (event.getClass().isAnnotationPresent(ImmediateEvent.class)) {
|
||
11 years ago
|
sendDirect(event);
|
||
|
return;
|
||
|
}
|
||
|
|
||
11 years ago
|
sendQueued(event);
|
||
11 years ago
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
11 years ago
|
* Add event to a queue
|
||
11 years ago
|
*
|
||
11 years ago
|
* @param event event
|
||
11 years ago
|
*/
|
||
11 years ago
|
public void sendQueued(Event<?> event)
|
||
11 years ago
|
{
|
||
|
assertLive();
|
||
11 years ago
|
|
||
11 years ago
|
sendDelayed(event, 0);
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Add event to a queue, scheduled for given time.
|
||
|
*
|
||
|
* @param event event
|
||
|
* @param delay delay before event is dispatched
|
||
|
*/
|
||
|
public void sendDelayed(Event<?> event, double delay)
|
||
|
{
|
||
|
assertLive();
|
||
|
|
||
11 years ago
|
final DelayQueueEntry dm = new DelayQueueEntry(delay, event);
|
||
11 years ago
|
|
||
11 years ago
|
if (shallLog(event)) Log.f3("<bus> Qu " + Log.str(event) + ", t = +" + delay + "s");
|
||
11 years ago
|
|
||
11 years ago
|
sendQueue.add(dm);
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send immediately.<br>
|
||
|
* Should be used for real-time events that require immediate response, such
|
||
|
* as timing events.
|
||
|
*
|
||
11 years ago
|
* @param event event
|
||
11 years ago
|
*/
|
||
11 years ago
|
public void sendDirect(Event<?> event)
|
||
|
{
|
||
|
assertLive();
|
||
|
|
||
11 years ago
|
if (shallLog(event)) Log.f3("<bus> Di " + Log.str(event));
|
||
11 years ago
|
|
||
|
dispatch(event);
|
||
|
}
|
||
|
|
||
|
|
||
11 years ago
|
public void sendDirectToChildren(DelegatingClient delegatingClient, Event<?> event)
|
||
|
{
|
||
|
assertLive();
|
||
|
|
||
|
if (shallLog(event)) Log.f3("<bus> Di sub " + Log.str(event));
|
||
|
|
||
|
doDispatch(delegatingClient.getChildClients(), event);
|
||
|
}
|
||
|
|
||
|
|
||
11 years ago
|
/**
|
||
|
* Send immediately.<br>
|
||
|
* Should be used for real-time events that require immediate response, such
|
||
|
* as timing events.
|
||
|
*
|
||
|
* @param event event
|
||
|
*/
|
||
|
private void dispatch(Event<?> event)
|
||
11 years ago
|
{
|
||
|
assertLive();
|
||
|
|
||
11 years ago
|
channels.setBuffering(true);
|
||
|
clients.setBuffering(true);
|
||
|
|
||
11 years ago
|
doDispatch(clients, event);
|
||
|
|
||
|
channels.setBuffering(false);
|
||
|
clients.setBuffering(false);
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Send to a set of clients
|
||
|
*
|
||
|
* @param clients clients
|
||
|
* @param event event
|
||
|
*/
|
||
|
private void doDispatch(Collection<Object> clients, Event<?> event)
|
||
|
{
|
||
11 years ago
|
boolean sent = false;
|
||
|
boolean accepted = false;
|
||
|
|
||
|
final boolean singular = event.getClass().isAnnotationPresent(SingleReceiverEvent.class);
|
||
|
|
||
|
for (final EventChannel<?, ?> b : channels) {
|
||
|
if (b.canBroadcast(event)) {
|
||
|
accepted = true;
|
||
|
sent |= b.broadcast(event, clients);
|
||
11 years ago
|
}
|
||
|
|
||
11 years ago
|
if (sent && singular) break;
|
||
11 years ago
|
}
|
||
11 years ago
|
|
||
|
if (!accepted) Log.e("<bus> Not accepted by any channel: " + Log.str(event));
|
||
|
if (!sent && shallLog(event)) Log.w("<bus> Not delivered: " + Log.str(event));
|
||
|
|
||
11 years ago
|
}
|
||
11 years ago
|
|
||
|
|
||
11 years ago
|
/**
|
||
11 years ago
|
* Connect a client to the bus. The client will be connected to all current
|
||
|
* and future channels, until removed from the bus.
|
||
11 years ago
|
*
|
||
11 years ago
|
* @param client the client
|
||
11 years ago
|
*/
|
||
11 years ago
|
public void subscribe(Object client)
|
||
11 years ago
|
{
|
||
11 years ago
|
assertLive();
|
||
|
|
||
|
if (client == null) return;
|
||
11 years ago
|
|
||
11 years ago
|
clients.add(client);
|
||
11 years ago
|
|
||
11 years ago
|
if (detailedLogging) Log.f3("<bus> Client joined: " + Log.str(client));
|
||
11 years ago
|
}
|
||
11 years ago
|
|
||
|
|
||
11 years ago
|
/**
|
||
|
* Disconnect a client from the bus.
|
||
|
*
|
||
11 years ago
|
* @param client the client
|
||
11 years ago
|
*/
|
||
|
public void unsubscribe(Object client)
|
||
11 years ago
|
{
|
||
11 years ago
|
assertLive();
|
||
|
|
||
11 years ago
|
clients.remove(client);
|
||
11 years ago
|
|
||
11 years ago
|
if (detailedLogging) Log.f3("<bus> Client left: " + Log.str(client));
|
||
11 years ago
|
}
|
||
|
|
||
|
|
||
11 years ago
|
/**
|
||
|
* Check if client can be accepted by any channel
|
||
|
*
|
||
|
* @param client tested client
|
||
|
* @return would be accepted
|
||
|
*/
|
||
11 years ago
|
public boolean isClientValid(Object client)
|
||
|
{
|
||
|
assertLive();
|
||
|
|
||
|
if (client == null) return false;
|
||
|
|
||
11 years ago
|
for (final EventChannel<?, ?> ch : channels) {
|
||
11 years ago
|
if (ch.isClientValid(client)) {
|
||
|
return true;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return false;
|
||
|
}
|
||
|
|
||
11 years ago
|
private class DelayQueueEntry implements Delayed {
|
||
11 years ago
|
|
||
11 years ago
|
private final long due;
|
||
11 years ago
|
private Event<?> evt = null;
|
||
11 years ago
|
|
||
|
|
||
11 years ago
|
public DelayQueueEntry(double seconds, Event<?> event) {
|
||
11 years ago
|
super();
|
||
|
this.due = System.currentTimeMillis() + (long) (seconds * 1000);
|
||
11 years ago
|
this.evt = event;
|
||
11 years ago
|
}
|
||
|
|
||
|
|
||
|
@Override
|
||
|
public int compareTo(Delayed o)
|
||
|
{
|
||
11 years ago
|
return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)).compareTo(o.getDelay(TimeUnit.MILLISECONDS));
|
||
11 years ago
|
}
|
||
|
|
||
|
|
||
|
@Override
|
||
|
public long getDelay(TimeUnit unit)
|
||
|
{
|
||
|
return unit.convert(due - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
||
|
}
|
||
|
|
||
|
|
||
11 years ago
|
public Event<?> getEvent()
|
||
11 years ago
|
{
|
||
11 years ago
|
return evt;
|
||
11 years ago
|
}
|
||
|
|
||
|
}
|
||
|
|
||
11 years ago
|
private class QueuePollingThread extends Thread {
|
||
|
|
||
|
public boolean stopped = false;
|
||
11 years ago
|
|
||
11 years ago
|
|
||
|
public QueuePollingThread() {
|
||
|
super("Queue Polling Thread");
|
||
|
}
|
||
11 years ago
|
|
||
|
|
||
|
@Override
|
||
|
public void run()
|
||
|
{
|
||
11 years ago
|
DelayQueueEntry evt;
|
||
11 years ago
|
|
||
11 years ago
|
while (!stopped) {
|
||
11 years ago
|
evt = null;
|
||
11 years ago
|
|
||
|
try {
|
||
11 years ago
|
evt = sendQueue.take();
|
||
11 years ago
|
} catch (final InterruptedException ignored) {
|
||
11 years ago
|
//
|
||
|
}
|
||
|
|
||
11 years ago
|
if (evt != null) {
|
||
|
dispatch(evt.getEvent());
|
||
11 years ago
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
11 years ago
|
}
|
||
11 years ago
|
|
||
|
|
||
11 years ago
|
/**
|
||
11 years ago
|
* Halt bus thread and reject any future events.
|
||
|
*/
|
||
|
@Override
|
||
|
public void destroy()
|
||
|
{
|
||
|
assertLive();
|
||
|
|
||
|
busThread.stopped = true;
|
||
|
|
||
|
dead = true;
|
||
|
}
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Make sure the bus is not destroyed.
|
||
11 years ago
|
*
|
||
11 years ago
|
* @throws IllegalStateException if the bus is dead.
|
||
11 years ago
|
*/
|
||
11 years ago
|
private void assertLive() throws IllegalStateException
|
||
11 years ago
|
{
|
||
11 years ago
|
if (dead) throw new IllegalStateException("EventBus is dead.");
|
||
11 years ago
|
}
|
||
11 years ago
|
|
||
11 years ago
|
}
|