From a4c485a7de9b3f64f9ad55b446bfcf8e6ade8a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Hru=C5=A1ka?= Date: Wed, 21 May 2014 20:27:21 +0200 Subject: [PATCH] Better concurrent safety in EventBus --- .../gamecore/eventbus/BufferedHashSet.java | 153 ------------------ .../gamecore/eventbus/EventBus.java | 11 +- 2 files changed, 5 insertions(+), 159 deletions(-) delete mode 100644 src/mightypork/gamecore/eventbus/BufferedHashSet.java diff --git a/src/mightypork/gamecore/eventbus/BufferedHashSet.java b/src/mightypork/gamecore/eventbus/BufferedHashSet.java deleted file mode 100644 index 31fadc1..0000000 --- a/src/mightypork/gamecore/eventbus/BufferedHashSet.java +++ /dev/null @@ -1,153 +0,0 @@ -package mightypork.gamecore.eventbus; - - -import java.util.Collection; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; - - -/** - * HashSet that buffers "add" and "remove" calls and performs them all at once - * when a flush() method is called. - * - * @author MightyPork - * @param element type - */ -public class BufferedHashSet extends HashSet { - - private final List toAdd = new LinkedList<>(); - private final List toRemove = new LinkedList<>(); - private boolean buffering = false; - - - /** - * make empty - */ - public BufferedHashSet() - { - super(); - } - - - /** - * make from elements of a collection - * - * @param c - */ - public BufferedHashSet(Collection c) - { - super(c); - } - - - /** - * make new - * - * @param initialCapacity - * @param loadFactor - */ - public BufferedHashSet(int initialCapacity, float loadFactor) - { - super(initialCapacity, loadFactor); - } - - - /** - * make new - * - * @param initialCapacity - */ - public BufferedHashSet(int initialCapacity) - { - super(initialCapacity); - } - - - @Override - public boolean add(E e) - { - if (buffering) { - toAdd.add(e); - } else { - super.add(e); - } - - return true; - } - - - @Override - public boolean remove(Object e) - { - if (buffering) { - toRemove.add(e); - } else { - super.remove(e); - } - - return true; - } - - - /** - * Flush all - */ - public void flush() - { - for (final E e : toAdd) { - super.add(e); - } - - for (final Object e : toRemove) { - super.remove(e); - } - - toAdd.clear(); - toRemove.clear(); - } - - - @Override - public boolean removeAll(Collection c) - { - throw new UnsupportedOperationException(); - } - - - @Override - public boolean containsAll(Collection c) - { - throw new UnsupportedOperationException(); - } - - - @Override - public boolean addAll(Collection c) - { - throw new UnsupportedOperationException(); - } - - - @Override - public boolean retainAll(Collection c) - { - throw new UnsupportedOperationException(); - } - - - /** - * Toggle buffering - * - * @param enable enable buffering - */ - public void setBuffering(boolean enable) - { - if (this.buffering && !enable) { - flush(); - } - - this.buffering = enable; - } - -} diff --git a/src/mightypork/gamecore/eventbus/EventBus.java b/src/mightypork/gamecore/eventbus/EventBus.java index 2a851cb..0eb6f91 100644 --- a/src/mightypork/gamecore/eventbus/EventBus.java +++ b/src/mightypork/gamecore/eventbus/EventBus.java @@ -4,6 +4,9 @@ package mightypork.gamecore.eventbus; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; @@ -132,13 +135,13 @@ final public class EventBus implements Destroyable, BusAccess { private final QueuePollingThread busThread; /** Registered clients */ - private final BufferedHashSet clients = new BufferedHashSet<>(); + private final Set clients = Collections.newSetFromMap(new ConcurrentHashMap()); /** Whether the bus was destroyed */ private boolean dead = false; /** Message channels */ - private final ConcurrentLinkedDeque> channels = new ConcurrentLinkedDeque<>(); + private final Set> channels = Collections.newSetFromMap(new ConcurrentHashMap,Boolean>()); /** Messages queued for delivery */ private final DelayQueue sendQueue = new DelayQueue<>(); @@ -341,12 +344,8 @@ final public class EventBus implements Destroyable, BusAccess { { assertLive(); - clients.setBuffering(true); - doDispatch(clients, event); event.onDispatchComplete(this); - - clients.setBuffering(false); }