1 |
package org.icesoft.notify.cloud.core; |
2 |
|
3 |
import static org.icesoft.util.MapUtilities.isNotNullAndIsNotEmpty; |
4 |
import static org.icesoft.util.ObjectUtilities.isEqual; |
5 |
import static org.icesoft.util.ObjectUtilities.isNotEqual; |
6 |
import static org.icesoft.util.ObjectUtilities.isNotNull; |
7 |
import static org.icesoft.util.PreCondition.checkIfIsNotNull; |
8 |
import static org.icesoft.util.PreCondition.checkIfIsNotNullAndIsNotEmpty; |
9 |
import static org.icesoft.util.concurrent.ReentrantLockUtilities.retainLocksHeldByCurrentThread; |
10 |
|
11 |
import java.util.Collections; |
12 |
import java.util.HashSet; |
13 |
import java.util.List; |
14 |
import java.util.Map; |
15 |
import java.util.Set; |
16 |
import java.util.concurrent.TimeUnit; |
17 |
import java.util.concurrent.locks.Condition; |
18 |
import java.util.logging.Level; |
19 |
import java.util.logging.Logger; |
20 |
|
21 |
import org.icesoft.util.Named; |
22 |
import org.icesoft.util.StringUtilities; |
23 |
import org.icesoft.util.concurrent.NamedReentrantLock; |
24 |
import org.icesoft.util.concurrent.Passthrough; |
25 |
|
26 |
public abstract class AbstractNotificationConsumer |
27 |
implements Named, NotificationConsumer, Runnable { |
28 |
private static final Logger LOGGER = Logger.getLogger(AbstractNotificationConsumer.class.getName()); |
29 |
|
30 |
private final Set<NamedReentrantLock> lockSet = new HashSet<NamedReentrantLock>(); |
31 |
|
32 |
private final String name; |
33 |
private final NotificationProvider notificationProvider; |
34 |
private final List<Passthrough<NotificationData>> passthroughList; |
35 |
private final Condition sharedCondition; |
36 |
private final NamedReentrantLock sharedLock; |
37 |
|
38 |
private boolean running = false; |
39 |
|
40 |
protected AbstractNotificationConsumer( |
41 |
final String name, final NamedReentrantLock sharedLock, final Condition sharedCondition, |
42 |
final List<Passthrough<NotificationData>> passthroughList, final NotificationProvider notificationProvider) |
43 |
throws IllegalArgumentException, NullPointerException { |
44 |
this.name = |
45 |
|
46 |
checkIfIsNotNullAndIsNotEmpty( |
47 |
name, |
48 |
"Illegal argument name: '" + name + "'. Argument cannot be null or empty." |
49 |
); |
50 |
this.sharedLock = |
51 |
|
52 |
checkIfIsNotNull( |
53 |
sharedLock, |
54 |
"Illegal argument sharedLock: '" + sharedLock + "'. Argument cannot be null." |
55 |
); |
56 |
this.sharedCondition = |
57 |
|
58 |
checkIfIsNotNull( |
59 |
sharedCondition, |
60 |
"Illegal argument sharedCondition: '" + sharedCondition + "'. Argument cannot be null." |
61 |
); |
62 |
this.passthroughList = |
63 |
|
64 |
checkIfIsNotNullAndIsNotEmpty( |
65 |
passthroughList, |
66 |
"Illegal argument passthroughList: '" + passthroughList + "'. Argument cannot be null or empty." |
67 |
); |
68 |
this.notificationProvider = |
69 |
|
70 |
checkIfIsNotNull( |
71 |
notificationProvider, |
72 |
"Illegal argument notificationProvider: '" + notificationProvider + "'. Argument cannot be null." |
73 |
); |
74 |
} |
75 |
|
76 |
@Override |
77 |
public boolean equals(final Object object) { |
78 |
return |
79 |
object instanceof AbstractNotificationConsumer && |
80 |
isEqual( |
81 |
((AbstractNotificationConsumer)object).getName(), |
82 |
getName() |
83 |
) && |
84 |
isEqual( |
85 |
((AbstractNotificationConsumer)object).getNotificationProvider(), |
86 |
getNotificationProvider() |
87 |
) && |
88 |
isEqual( |
89 |
((AbstractNotificationConsumer)object).getModifiablePassthroughList(), |
90 |
getModifiablePassthroughList() |
91 |
) && |
92 |
isEqual( |
93 |
((AbstractNotificationConsumer)object).isRunning(), |
94 |
isRunning() |
95 |
) && |
96 |
isEqual( |
97 |
((AbstractNotificationConsumer)object).getSharedCondition(), |
98 |
getSharedCondition() |
99 |
) && |
100 |
isEqual( |
101 |
((AbstractNotificationConsumer)object).getSharedLock(), |
102 |
getSharedLock() |
103 |
); |
104 |
} |
105 |
|
106 |
public String getName() { |
107 |
return name; |
108 |
} |
109 |
|
110 |
@Override |
111 |
public int hashCode() { |
112 |
int _hashCode; |
113 |
_hashCode = |
114 |
isNotNull(getName()) ? getName().hashCode() : 0; |
115 |
_hashCode = |
116 |
31 * _hashCode + |
117 |
(isNotNull(getNotificationProvider()) ? getNotificationProvider().hashCode() : 0); |
118 |
_hashCode = |
119 |
31 * _hashCode + |
120 |
(isNotNull(getModifiablePassthroughList()) ? getModifiablePassthroughList().hashCode() : 0); |
121 |
_hashCode = |
122 |
31 * _hashCode + |
123 |
(isRunning() ? Boolean.TRUE.hashCode() : Boolean.FALSE.hashCode()); |
124 |
_hashCode = |
125 |
31 * _hashCode + |
126 |
(isNotNull(getSharedCondition()) ? getSharedCondition().hashCode() : 0); |
127 |
_hashCode = |
128 |
31 * _hashCode + |
129 |
(isNotNull(getSharedLock()) ? getSharedLock().hashCode() : 0); |
130 |
return _hashCode; |
131 |
} |
132 |
|
133 |
public boolean isRunning() { |
134 |
return running; |
135 |
} |
136 |
|
137 |
public void run() { |
138 |
if (!isRunning()) { |
139 |
setRunning(true); |
140 |
} |
141 |
while (isRunning()) { |
142 |
try { |
143 |
|
144 |
|
145 |
LOGGER.info( |
146 |
"[Jack] " + getName() + " | Trying to lock " + getSharedLock().getName() + "..." |
147 |
); |
148 |
tryLockSharedLock(); |
149 |
LOGGER.info( |
150 |
"[Jack] " + getName() + " | Locked " + getSharedLock().getName() + "." |
151 |
); |
152 |
try { |
153 |
LOGGER.info( |
154 |
"[Jack] " + getName() + " | Awaiting on " + getName() + "'s Shared Condition..." |
155 |
); |
156 |
getSharedCondition(). |
157 |
|
158 |
await(); |
159 |
} finally { |
160 |
unlockSharedLock(); |
161 |
LOGGER.info( |
162 |
"[Jack] " + getName() + " | Unlocked " + getSharedLock().getName() + "." |
163 |
); |
164 |
} |
165 |
NotificationData _notificationData = null; |
166 |
while ((_notificationData = getNotificationData()) != null) { |
167 |
consume(_notificationData); |
168 |
} |
169 |
} catch (final InterruptedException exception) { |
170 |
if (LOGGER.isLoggable(Level.WARNING)) { |
171 |
LOGGER.log( |
172 |
Level.WARNING, |
173 |
getName() + " | " + |
174 |
"Consumer '" + this + "' interrupted. Stopping..." |
175 |
); |
176 |
} |
177 |
stop(); |
178 |
} |
179 |
} |
180 |
} |
181 |
|
182 |
public void stop() { |
183 |
if (isRunning()) { |
184 |
setRunning(false); |
185 |
} |
186 |
} |
187 |
|
188 |
@Override |
189 |
public String toString() { |
190 |
return "AbstractNotificationConsumer[" + classMembersToString() + "]"; |
191 |
} |
192 |
|
193 |
protected String classMembersToString() { |
194 |
return |
195 |
"name: '" + getName() + "', " + |
196 |
"notificationProvider: '" + getNotificationProvider() + "', " + |
197 |
"passthroughList: '" + getModifiablePassthroughList() + "', " + |
198 |
"running: '" + isRunning() + "', " + |
199 |
"sharedCondition: '" + getSharedCondition() + "', " + |
200 |
"sharedLock: '" + getSharedLock() + "'"; |
201 |
} |
202 |
|
203 |
protected Set<NamedReentrantLock> getLockSet() { |
204 |
return Collections.unmodifiableSet(getModifiableLockSet()); |
205 |
} |
206 |
|
207 |
protected Set<NamedReentrantLock> getModifiableLockSet() { |
208 |
return lockSet; |
209 |
} |
210 |
|
211 |
protected NotificationData getNotificationData() |
212 |
throws InterruptedException { |
213 |
NotificationData _notificationData = null; |
214 |
for (final Passthrough<NotificationData> _passthrough : getPassthroughList()) { |
215 |
if ((_notificationData = _passthrough.receive()) != null) { |
216 |
break; |
217 |
} |
218 |
} |
219 |
return _notificationData; |
220 |
} |
221 |
|
222 |
protected NotificationProvider getNotificationProvider() { |
223 |
return notificationProvider; |
224 |
} |
225 |
|
226 |
protected List<Passthrough<NotificationData>> getPassthroughList() { |
227 |
return Collections.unmodifiableList(getModifiablePassthroughList()); |
228 |
} |
229 |
|
230 |
protected List<Passthrough<NotificationData>> getModifiablePassthroughList() { |
231 |
return passthroughList; |
232 |
} |
233 |
|
234 |
protected Object getProperty( |
235 |
final Map<NotificationProvider.Category, Map<String, Object>> categoryToPropertyMap, final String name) { |
236 |
|
237 |
return getProperty(categoryToPropertyMap, name, (Object)null); |
238 |
} |
239 |
|
240 |
protected Object getProperty( |
241 |
final Map<NotificationProvider.Category, Map<String, Object>> categoryToPropertyMap, final String name, |
242 |
final Object defaultValue) { |
243 |
|
244 |
if (isNotNullAndIsNotEmpty(categoryToPropertyMap) && StringUtilities.isNotNullAndIsNotEmpty(name)) { |
245 |
if (categoryToPropertyMap.containsKey(getNotificationProvider().getCategory())) { |
246 |
Map<String, Object> _propertyMap = categoryToPropertyMap.get(getNotificationProvider().getCategory()); |
247 |
if (isNotNullAndIsNotEmpty(_propertyMap) && _propertyMap.containsKey(name)) { |
248 |
return _propertyMap.get(name); |
249 |
} |
250 |
} |
251 |
if (categoryToPropertyMap.containsKey(NotificationProvider.Category.GLOBAL)) { |
252 |
Map<String, Object> _propertyMap = categoryToPropertyMap.get(NotificationProvider.Category.GLOBAL); |
253 |
if (isNotNullAndIsNotEmpty(_propertyMap) && _propertyMap.containsKey(name)) { |
254 |
return _propertyMap.get(name); |
255 |
} |
256 |
} |
257 |
} |
258 |
return defaultValue; |
259 |
} |
260 |
|
261 |
protected Condition getSharedCondition() { |
262 |
return sharedCondition; |
263 |
} |
264 |
|
265 |
protected NamedReentrantLock getSharedLock() { |
266 |
return sharedLock; |
267 |
} |
268 |
|
269 |
protected void setRunning(final boolean running) { |
270 |
if (isNotEqual(isRunning(), running)) { |
271 |
this.running = running; |
272 |
} |
273 |
} |
274 |
protected void tryLockSharedLock() |
275 |
throws InterruptedException { |
276 |
int _attemptCount = 0; |
277 |
while (!getSharedLock().tryLock(300L, TimeUnit.SECONDS)) { |
278 |
_attemptCount++; |
279 |
if (LOGGER.isLoggable(Level.WARNING)) { |
280 |
LOGGER.log( |
281 |
Level.WARNING, |
282 |
"Trying to acquire Shared Lock failed for " + (_attemptCount * 300L) + "s. " + |
283 |
"(" + |
284 |
"Thread: '" + Thread.currentThread().getName() + "', " + |
285 |
"Locks held: '" + retainLocksHeldByCurrentThread(getLockSet()) + "'" + |
286 |
")" |
287 |
); |
288 |
} |
289 |
} |
290 |
} |
291 |
|
292 |
protected void unlockSharedLock() { |
293 |
getSharedLock().unlock(); |
294 |
} |
295 |
} |