sventon subversion web client - http://www.sventon.org
[show recent changes]
 
  Help
HEAD
Rev: 53043 - http://anonsvn.icesoft.org/repo / icenotify / tags / vras-1.3.1 / icenotify / cloud / src / main / java / org / icesoft / notify / cloud / core / NotificationProviderBlockingQueueListener.java
Show File - NotificationProviderBlockingQueueListener.java  [show properties]
spinner
package org.icesoft.notify.cloud.core;
import static org.icesoft.util.ObjectUtilities.isNotEqual;
import static org.icesoft.util.PreCondition.checkIfIsNotNull;
import com.icesoft.notify.data.Event;
import com.icesoft.notify.util.messaging.MessagingException;
import com.icesoft.notify.util.messaging.aws.AmazonMessagingClient;
10  import java.util.Date;
11  import java.util.UUID;
12  import java.util.logging.Level;
13  import java.util.logging.Logger;
14 
15  import javax.servlet.ServletContext;
16 
17  import org.icesoft.util.concurrent.BlockingQueueEvent;
18  import org.icesoft.util.concurrent.BlockingQueueListener;
19 
20  public class NotificationProviderBlockingQueueListener
21  implements BlockingQueueListener {
22      private static final Logger LOGGER = Logger.getLogger(NotificationProviderBlockingQueueListener.class.getName());
23 
24      private final String name;
25      private final ServletContext servletContext;
26 
27      private long lastConsumeTimestamp = System.currentTimeMillis();
28      private String status = "ok";
29 
30      public NotificationProviderBlockingQueueListener(final String name, final ServletContext servletContext)
31      throws IllegalArgumentException, NullPointerException {
32          super();
33          this.name =
34              checkIfIsNotNull(
35                  name,
36                  "Illegal argument name: '" + name + "'.  Argument cannot be null."
37              );
38          this.servletContext =
39              checkIfIsNotNull(
40                  servletContext,
41                  "Illegal argument servletContext: '" + servletContext + "'.  Argument cannot be null."
42              );
43      }
44 
45      public void onAdd(final BlockingQueueEvent event) {
46          LOGGER.info("[Jack] onAdd(...)");
47          if (getStatus().equalsIgnoreCase("ok") && System.currentTimeMillis() - getLastConsumeTimestamp() > 90000) {
48              setStatus("alarm");
49              publishBlockingQueueEvent(getName(), getStatus(), "Consumers stopped consuming for the last 90 seconds.");
50          }
51      }
52 
53      public void onAddAll(final BlockingQueueEvent event) {
54          LOGGER.info("[Jack] onAddAll(...)");
55          if (getStatus().equalsIgnoreCase("ok") && System.currentTimeMillis() - getLastConsumeTimestamp() > 90000) {
56              setStatus("alarm");
57              publishBlockingQueueEvent(getName(), getStatus(), "Consumers stopped consuming for the last 90 seconds.");
58          }
59      }
60 
61      public void onElement(BlockingQueueEvent event) {
62          LOGGER.info("[Jack] onElement(...)");
63          // Do nothing.
64      }
65 
66      public void onOffer(final BlockingQueueEvent event) {
67          LOGGER.info("[Jack] onOffer(...)");
68          if (getStatus().equalsIgnoreCase("ok") && System.currentTimeMillis() - getLastConsumeTimestamp() > 90000) {
69              setStatus("alarm");
70              publishBlockingQueueEvent(getName(), getStatus(), "Consumers stopped consuming for the last 90 seconds.");
71          }
72      }
73 
74      public void onPeek(final BlockingQueueEvent event) {
75          LOGGER.info("[Jack] onPeek(...)");
76          // Do nothing.
77      }
78 
79      public void onPoll(final BlockingQueueEvent event) {
80          LOGGER.info("[Jack] onPoll(...)");
81          setLastConsumeTimestamp(System.currentTimeMillis());
82          if (getStatus().equalsIgnoreCase("alarm")) {
83              setStatus("ok");
84              publishBlockingQueueEvent(getName(), getStatus(), "Consumers started consuming again.");
85          }
86      }
87 
88      public void onPut(final BlockingQueueEvent event) {
89          LOGGER.info("[Jack] onPut(...)");
90          if (getStatus().equalsIgnoreCase("ok") && System.currentTimeMillis() - getLastConsumeTimestamp() > 90000) {
91              setStatus("alarm");
92              publishBlockingQueueEvent(getName(), getStatus(), "Consumers stopped consuming for the last 90 seconds.");
93          }
94      }
95 
96      public void onRemove(final BlockingQueueEvent event) {
97          LOGGER.info("[Jack] onRemove(...)");
98          setLastConsumeTimestamp(System.currentTimeMillis());
99          if (getStatus().equalsIgnoreCase("alarm")) {
100              setStatus("ok");
101              publishBlockingQueueEvent(getName(), getStatus(), "Consumers started consuming again.");
102          }
103      }
104 
105      public void onRemoveAll(final BlockingQueueEvent event) {
106          LOGGER.info("[Jack] onRemoveAll(...)");
107          setLastConsumeTimestamp(System.currentTimeMillis());
108          if (getStatus().equalsIgnoreCase("alarm")) {
109              setStatus("ok");
110              publishBlockingQueueEvent(getName(), getStatus(), "Consumers started consuming again.");
111          }
112      }
113 
114      public void onTake(final BlockingQueueEvent event) {
115          LOGGER.info("[Jack] onTake(...)");
116          setLastConsumeTimestamp(System.currentTimeMillis());
117          if (getStatus().equalsIgnoreCase("alarm")) {
118              setStatus("ok");
119              publishBlockingQueueEvent(getName(), getStatus(), "Consumers started consuming again.");
120          }
121      }
122 
123      protected long getLastConsumeTimestamp() {
124          return lastConsumeTimestamp;
125      }
126 
127      protected String getName() {
128          return name;
129      }
130 
131      protected ServletContext getServletContext() {
132          return servletContext;
133      }
134 
135      protected String getStatus() {
136          return status;
137      }
138 
139      protected void publish(final Event event) {
140          try {
141              AmazonMessagingClient.getInstance(getServletContext()).
142                  publishToTopic(event.toJSONObject(), "cloud");
143          } catch (final IllegalStateException exception) {
144              if (LOGGER.isLoggable(Level.WARNING)) {
145                  LOGGER.log(
146                      Level.WARNING,
147                      "Illegal state.",
148                      exception
149                  );
150              }
151          } catch (final MessagingException exception) {
152              if (LOGGER.isLoggable(Level.WARNING)) {
153                  LOGGER.log(
154                      Level.WARNING,
155                      "A messaging error occurred while trying to publish " +
156                          "Event '" + event + "'.",
157                      exception
158                  );
159              }
160          }
161      }
162 
163      protected void publishBlockingQueueEvent(final String name, final String status, final String message) {
164          Event _blockingQueueEvent = new Event();
165          _blockingQueueEvent.setService("cloud");
166          _blockingQueueEvent.setEvent("queueConsumer");
167          _blockingQueueEvent.setType("status");
168          _blockingQueueEvent.setUserName("voyent");
169          _blockingQueueEvent.setTime(new Date(System.currentTimeMillis()));
170          _blockingQueueEvent.setTransactionID(UUID.randomUUID().toString());
171          _blockingQueueEvent.setData(
172              new NotificationProviderBlockingQueueEventData().
173                  withName(name).
174                  withStatus(status).
175                  withMessage(message).
176                      toJSONObject()
177          );
178          publish(_blockingQueueEvent);
179      }
180 
181      protected void setLastConsumeTimestamp(final long lastConsumeTimestamp) {
182          if (isNotEqual(getLastConsumeTimestamp(), lastConsumeTimestamp)) {
183              this.lastConsumeTimestamp = lastConsumeTimestamp;
184          }
185      }
186 
187      protected void setStatus(final String status) {
188          if (isNotEqual(getStatus(), status)) {
189              this.status = status;
190          }
191      }
192  }


feed icon

sventon 2.5.1