Better concurrent safety in EventBus

v5stable
Ondřej Hruška 11 years ago
parent 15041d8ed2
commit a4c485a7de
  1. 153
      src/mightypork/gamecore/eventbus/BufferedHashSet.java
  2. 11
      src/mightypork/gamecore/eventbus/EventBus.java

@ -1,153 +0,0 @@
package mightypork.gamecore.eventbus;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
/**
* HashSet that buffers "add" and "remove" calls and performs them all at once
* when a flush() method is called.
*
* @author MightyPork
* @param <E> element type
*/
public class BufferedHashSet<E> extends HashSet<E> {
private final List<E> toAdd = new LinkedList<>();
private final List<Object> toRemove = new LinkedList<>();
private boolean buffering = false;
/**
* make empty
*/
public BufferedHashSet()
{
super();
}
/**
* make from elements of a collection
*
* @param c
*/
public BufferedHashSet(Collection<? extends E> c)
{
super(c);
}
/**
* make new
*
* @param initialCapacity
* @param loadFactor
*/
public BufferedHashSet(int initialCapacity, float loadFactor)
{
super(initialCapacity, loadFactor);
}
/**
* make new
*
* @param initialCapacity
*/
public BufferedHashSet(int initialCapacity)
{
super(initialCapacity);
}
@Override
public boolean add(E e)
{
if (buffering) {
toAdd.add(e);
} else {
super.add(e);
}
return true;
}
@Override
public boolean remove(Object e)
{
if (buffering) {
toRemove.add(e);
} else {
super.remove(e);
}
return true;
}
/**
* Flush all
*/
public void flush()
{
for (final E e : toAdd) {
super.add(e);
}
for (final Object e : toRemove) {
super.remove(e);
}
toAdd.clear();
toRemove.clear();
}
@Override
public boolean removeAll(Collection<?> c)
{
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> c)
{
throw new UnsupportedOperationException();
}
@Override
public boolean addAll(Collection<? extends E> c)
{
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> c)
{
throw new UnsupportedOperationException();
}
/**
* Toggle buffering
*
* @param enable enable buffering
*/
public void setBuffering(boolean enable)
{
if (this.buffering && !enable) {
flush();
}
this.buffering = enable;
}
}

@ -4,6 +4,9 @@ package mightypork.gamecore.eventbus;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
@ -132,13 +135,13 @@ final public class EventBus implements Destroyable, BusAccess {
private final QueuePollingThread busThread; private final QueuePollingThread busThread;
/** Registered clients */ /** Registered clients */
private final BufferedHashSet<Object> clients = new BufferedHashSet<>(); private final Set<Object> clients = Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>());
/** Whether the bus was destroyed */ /** Whether the bus was destroyed */
private boolean dead = false; private boolean dead = false;
/** Message channels */ /** Message channels */
private final ConcurrentLinkedDeque<EventChannel<?, ?>> channels = new ConcurrentLinkedDeque<>(); private final Set<EventChannel<?, ?>> channels = Collections.newSetFromMap(new ConcurrentHashMap<EventChannel<?, ?>,Boolean>());
/** Messages queued for delivery */ /** Messages queued for delivery */
private final DelayQueue<DelayQueueEntry> sendQueue = new DelayQueue<>(); private final DelayQueue<DelayQueueEntry> sendQueue = new DelayQueue<>();
@ -341,12 +344,8 @@ final public class EventBus implements Destroyable, BusAccess {
{ {
assertLive(); assertLive();
clients.setBuffering(true);
doDispatch(clients, event); doDispatch(clients, event);
event.onDispatchComplete(this); event.onDispatchComplete(this);
clients.setBuffering(false);
} }

Loading…
Cancel
Save