sventon subversion web client - http://www.sventon.org
[show recent changes]
 
  Help
HEAD
Rev: 53085 - http://anonsvn.icesoft.org/repo / icenotify / trunk / icenotify / cloud / src / main / java / org / icesoft / notify / cloud / core / AbstractNotificationConsumer.java
Show File - AbstractNotificationConsumer.java  [show properties]
spinner
package org.icesoft.notify.cloud.core;
import static org.icesoft.util.MapUtilities.isNotNullAndIsNotEmpty;
import static org.icesoft.util.ObjectUtilities.isEqual;
import static org.icesoft.util.ObjectUtilities.isNotEqual;
import static org.icesoft.util.ObjectUtilities.isNotNull;
import static org.icesoft.util.PreCondition.checkIfIsNotNull;
import static org.icesoft.util.PreCondition.checkIfIsNotNullAndIsNotEmpty;
import static org.icesoft.util.concurrent.ReentrantLockUtilities.retainLocksHeldByCurrentThread;
10 
11  import java.util.Collections;
12  import java.util.HashSet;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Set;
16  import java.util.concurrent.TimeUnit;
17  import java.util.concurrent.locks.Condition;
18  import java.util.logging.Level;
19  import java.util.logging.Logger;
20 
21  import org.icesoft.util.Named;
22  import org.icesoft.util.StringUtilities;
23  import org.icesoft.util.concurrent.NamedReentrantLock;
24  import org.icesoft.util.concurrent.Passthrough;
25 
26  public abstract class AbstractNotificationConsumer
27  implements Named, NotificationConsumer, Runnable {
28      private static final Logger LOGGER = Logger.getLogger(AbstractNotificationConsumer.class.getName());
29 
30      private final Set<NamedReentrantLock> lockSet = new HashSet<NamedReentrantLock>();
31 
32      private final String name;
33      private final NotificationProvider notificationProvider;
34      private final List<Passthrough<NotificationData>> passthroughList;
35      private final Condition sharedCondition;
36      private final NamedReentrantLock sharedLock;
37 
38      private boolean running = false;
39 
40      protected AbstractNotificationConsumer(
41          final String name, final NamedReentrantLock sharedLock, final Condition sharedCondition,
42          final List<Passthrough<NotificationData>> passthroughList, final NotificationProvider notificationProvider)
43      throws IllegalArgumentException, NullPointerException {
44          this.name =
45              // throws IllegalArgumentException
46              checkIfIsNotNullAndIsNotEmpty(
47                  name,
48                  "Illegal argument name: '" + name + "'.  Argument cannot be null or empty."
49              );
50          this.sharedLock =
51              // throws NullPointerException
52              checkIfIsNotNull(
53                  sharedLock,
54                  "Illegal argument sharedLock: '" + sharedLock + "'.  Argument cannot be null."
55              );
56          this.sharedCondition =
57              // throws NullPointerException
58              checkIfIsNotNull(
59                  sharedCondition,
60                  "Illegal argument sharedCondition: '" + sharedCondition + "'.  Argument cannot be null."
61              );
62          this.passthroughList =
63              // throws IllegalArgumentException
64              checkIfIsNotNullAndIsNotEmpty(
65                  passthroughList,
66                  "Illegal argument passthroughList: '" + passthroughList + "'.  Argument cannot be null or empty."
67              );
68          this.notificationProvider =
69              // throws NullPointerException
70              checkIfIsNotNull(
71                  notificationProvider,
72                  "Illegal argument notificationProvider: '" + notificationProvider + "'.  Argument cannot be null."
73              );
74      }
75 
76      @Override
77      public boolean equals(final Object object) {
78          return
79              object instanceof AbstractNotificationConsumer &&
80                  isEqual(
81                      ((AbstractNotificationConsumer)object).getName(),
82                      getName()
83                  ) &&
84                  isEqual(
85                      ((AbstractNotificationConsumer)object).getNotificationProvider(),
86                      getNotificationProvider()
87                  ) &&
88                  isEqual(
89                      ((AbstractNotificationConsumer)object).getModifiablePassthroughList(),
90                      getModifiablePassthroughList()
91                  ) &&
92                  isEqual(
93                      ((AbstractNotificationConsumer)object).isRunning(),
94                      isRunning()
95                  ) &&
96                  isEqual(
97                      ((AbstractNotificationConsumer)object).getSharedCondition(),
98                      getSharedCondition()
99                  ) &&
100                  isEqual(
101                      ((AbstractNotificationConsumer)object).getSharedLock(),
102                      getSharedLock()
103                  );
104      }
105 
106      public String getName() {
107          return name;
108      }
109 
110      @Override
111      public int hashCode() {
112          int _hashCode;
113          _hashCode =
114              isNotNull(getName()) ? getName().hashCode() : 0;
115          _hashCode =
116              31 * _hashCode +
117                  (isNotNull(getNotificationProvider()) ? getNotificationProvider().hashCode() : 0);
118          _hashCode =
119              31 * _hashCode +
120                  (isNotNull(getModifiablePassthroughList()) ? getModifiablePassthroughList().hashCode() : 0);
121          _hashCode =
122              31 * _hashCode +
123                  (isRunning() ? Boolean.TRUE.hashCode() : Boolean.FALSE.hashCode());
124          _hashCode =
125              31 * _hashCode +
126                  (isNotNull(getSharedCondition()) ? getSharedCondition().hashCode() : 0);
127          _hashCode =
128              31 * _hashCode +
129                  (isNotNull(getSharedLock()) ? getSharedLock().hashCode() : 0);
130          return _hashCode;
131      }
132 
133      public boolean isRunning() {
134          return running;
135      }
136 
137      public void run() {
138          if (!isRunning()) {
139              setRunning(true);
140          }
141          while (isRunning()) {
142              try {
143                  // throws InterruptedException
144                  // throws InterruptedException
145                  LOGGER.info(
146                      "[Jack] " + getName() + " | Trying to lock " + getSharedLock().getName() + "..."
147                  );
148                  tryLockSharedLock();
149                  LOGGER.info(
150                      "[Jack] " + getName() + " | Locked " + getSharedLock().getName() + "."
151                  );
152                  try {
153                      LOGGER.info(
154                          "[Jack] " + getName() + " | Awaiting on " + getName() + "'s Shared Condition..."
155                      );
156                      getSharedCondition().
157                          // throws InterruptedException
158                          await();
159                  } finally {
160                      unlockSharedLock();
161                      LOGGER.info(
162                          "[Jack] " + getName() + " | Unlocked " + getSharedLock().getName() + "."
163                      );
164                  }
165                  NotificationData _notificationData = null;
166                  while ((_notificationData = getNotificationData()) != null) {
167                      consume(_notificationData);
168                  }
169              } catch (final InterruptedException exception) {
170                  if (LOGGER.isLoggable(Level.WARNING)) {
171                      LOGGER.log(
172                          Level.WARNING,
173                          getName() + " | " +
174                              "Consumer '" + this + "' interrupted.  Stopping..."
175                      );
176                  }
177                  stop();
178              }
179          }
180      }
181 
182      public void stop() {
183          if (isRunning()) {
184              setRunning(false);
185          }
186      }
187 
188      @Override
189      public String toString() {
190          return "AbstractNotificationConsumer[" + classMembersToString() + "]";
191      }
192 
193      protected String classMembersToString() {
194          return
195              "name: '" + getName() + "', " +
196              "notificationProvider: '" + getNotificationProvider() + "', " +
197              "passthroughList: '" + getModifiablePassthroughList() + "', " +
198              "running: '" + isRunning() + "', " +
199              "sharedCondition: '" + getSharedCondition() + "', " +
200              "sharedLock: '" + getSharedLock() + "'";
201      }
202 
203      protected Set<NamedReentrantLock> getLockSet() {
204          return Collections.unmodifiableSet(getModifiableLockSet());
205      }
206 
207      protected Set<NamedReentrantLock> getModifiableLockSet() {
208          return lockSet;
209      }
210 
211      protected NotificationData getNotificationData()
212      throws InterruptedException {
213          NotificationData _notificationData = null;
214          for (final Passthrough<NotificationData> _passthrough : getPassthroughList()) {
215              if ((_notificationData = _passthrough.receive()) != null) {
216                  break;
217              }
218          }
219          return _notificationData;
220      }
221 
222      protected NotificationProvider getNotificationProvider() {
223          return notificationProvider;
224      }
225 
226      protected List<Passthrough<NotificationData>> getPassthroughList() {
227          return Collections.unmodifiableList(getModifiablePassthroughList());
228      }
229 
230      protected List<Passthrough<NotificationData>> getModifiablePassthroughList() {
231          return passthroughList;
232      }
233 
234      protected Object getProperty(
235          final Map<NotificationProvider.Category, Map<String, Object>> categoryToPropertyMap, final String name) {
236 
237          return getProperty(categoryToPropertyMap, name, (Object)null);
238      }
239 
240      protected Object getProperty(
241          final Map<NotificationProvider.Category, Map<String, Object>> categoryToPropertyMap, final String name,
242          final Object defaultValue) {
243 
244          if (isNotNullAndIsNotEmpty(categoryToPropertyMap) && StringUtilities.isNotNullAndIsNotEmpty(name)) {
245              if (categoryToPropertyMap.containsKey(getNotificationProvider().getCategory())) {
246                  Map<String, Object> _propertyMap = categoryToPropertyMap.get(getNotificationProvider().getCategory());
247                  if (isNotNullAndIsNotEmpty(_propertyMap) && _propertyMap.containsKey(name)) {
248                      return _propertyMap.get(name);
249                  }
250              }
251              if (categoryToPropertyMap.containsKey(NotificationProvider.Category.GLOBAL)) {
252                  Map<String, Object> _propertyMap = categoryToPropertyMap.get(NotificationProvider.Category.GLOBAL);
253                  if (isNotNullAndIsNotEmpty(_propertyMap) && _propertyMap.containsKey(name)) {
254                      return _propertyMap.get(name);
255                  }
256              }
257          }
258          return defaultValue;
259      }
260 
261      protected Condition getSharedCondition() {
262          return sharedCondition;
263      }
264 
265      protected NamedReentrantLock getSharedLock() {
266          return sharedLock;
267      }
268 
269      protected void setRunning(final boolean running) {
270          if (isNotEqual(isRunning(), running)) {
271              this.running = running;
272          }
273      }
274      protected void tryLockSharedLock()
275      throws InterruptedException {
276          int _attemptCount = 0;
277          while (!getSharedLock().tryLock(300L, TimeUnit.SECONDS)) {
278              _attemptCount++;
279              if (LOGGER.isLoggable(Level.WARNING)) {
280                  LOGGER.log(
281                      Level.WARNING,
282                      "Trying to acquire Shared Lock failed for " + (_attemptCount * 300L) + "s.  " +
283                          "(" +
284                              "Thread: '" + Thread.currentThread().getName() + "', " +
285                              "Locks held: '" + retainLocksHeldByCurrentThread(getLockSet()) + "'" +
286                          ")"
287                  );
288              }
289          }
290      }
291 
292      protected void unlockSharedLock() {
293          getSharedLock().unlock();
294      }
295  }


feed icon

sventon 2.5.1