@@ -1,51 +1,65 @@
package org.icesoft.notify.cloud.core.email;
+import static org.icesoft.util.ObjectUtilities.getValue;
import static org.icesoft.util.ObjectUtilities.isEqual;
import static org.icesoft.util.ObjectUtilities.isNotEqual;
import static org.icesoft.util.ObjectUtilities.isNotNull;
import static org.icesoft.util.PreCondition.checkIfIsNotNull;
import static org.icesoft.util.PreCondition.checkIfIsNotNullAndIsNotEmpty;
+import static org.icesoft.util.StringUtilities.isNotEmpty;
+import static org.icesoft.util.StringUtilities.isNotNullAndIsNotEmpty;
+import static org.icesoft.util.StringUtilities.isNullOrIsEmpty;
+import java.io.BufferedReader;
import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.locks.Condition;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.icesoft.notify.cloud.core.CloudNotificationEndpoint;
+import org.icesoft.notify.cloud.core.AbstractNotificationConsumer;
+import org.icesoft.notify.cloud.core.NotificationConsumer;
+import org.icesoft.notify.cloud.core.NotificationData;
+import org.icesoft.notify.cloud.core.NotificationProvider;
import org.icesoft.util.Named;
-
-import javax.json.Json;
+import org.icesoft.util.concurrent.NamedReentrantLock;
+import org.icesoft.util.concurrent.Passthrough;
public class EmailNotificationConsumer
-implements Named, Runnable {
+extends AbstractNotificationConsumer
+implements Named, NotificationConsumer, Runnable {
private static final Logger LOGGER = Logger.getLogger(EmailNotificationConsumer.class.getName());
+ private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss (z)");
+ static {
+ DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("GMT"));
+ }
private static final String NAME_PREFIX = "Email Notification Consumer";
private final EmailConnection connection;
private final String id;
- private final String name;
- private final BlockingQueue<EmailNotification> notificationQueue;
-
- private boolean running = false;
public EmailNotificationConsumer(
- final String id, final BlockingQueue<EmailNotification> notificationQueue, final EmailConnection connection)
+ final String id, final NamedReentrantLock sharedLock, final Condition sharedCondition,
+ final List<Passthrough<NotificationData>> passthroughList, final EmailConnection connection,
+ final NotificationProvider notificationProvider)
throws IllegalArgumentException, NullPointerException {
+ super(NAME_PREFIX + " [" + id + "]", sharedLock, sharedCondition, passthroughList, notificationProvider);
this.id =
// throws IllegalArgumentException
checkIfIsNotNullAndIsNotEmpty(
id,
"Illegal argument id: '" + id + ". Argument cannot be null or empty."
);
- this.name = new StringBuilder().append(NAME_PREFIX).append(" [").append(getID()).append("]").toString();
- this.notificationQueue =
- // throws NullPointerException
- checkIfIsNotNull(
- notificationQueue,
- "Illegal argument notificationQueue: '" + notificationQueue + "'. Argument cannot be null."
- );
this.connection =
// throws NullPointerException
checkIfIsNotNull(
@@ -54,6 +68,41 @@
);
}
+ public void consume(final NotificationData notificationData) {
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.log(
+ Level.FINE,
+ "Sending E-mail Notification with " +
+ "Properties '" + notificationData.getCategoryToPropertyMap() + "' to " +
+ "Notification Endpoint '" + notificationData.getNotificationEndpoint() + "'."
+ );
+ }
+ EmailNotification _notification = constructNotification(notificationData);
+ if (isNotNull(_notification)) {
+ try {
+ getConnection().send(_notification);
+ } catch (final IOException exception) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(
+ Level.WARNING,
+ getName() + " | " +
+ "Failed to send Email Notification to " +
+ "Notify-Back-URI '" +
+ _notification.getNotificationEndpoint().getNotifyBackURI() +
+ "'. " +
+ "(" +
+ "Notification Payload: '" + _notification.getContent() + "', " +
+ "Host: '" + getConnection().getHost() + "', " +
+ "Port: '" + getConnection().getPort() + "', " +
+ "Message: '" + exception.getMessage() + "'" +
+ ")",
+ exception
+ );
+ }
+ }
+ }
+ }
+
@Override
public boolean equals(final Object object) {
return
@@ -66,184 +115,203 @@
((EmailNotificationConsumer)object).getID(),
getID()
) &&
- isEqual(
- ((EmailNotificationConsumer)object).getName(),
- getName()
- ) &&
- isEqual(
- ((EmailNotificationConsumer)object).getModifiableNotificationQueue(),
- getModifiableNotificationQueue()
- );
- }
-
- public String getName() {
- return name;
+ super.equals(object);
}
@Override
public int hashCode() {
int _hashCode;
- _hashCode = getConnection() != null ? getConnection().hashCode() : 0;
- _hashCode =
- 31 * _hashCode +
- (getID() != null ? getID().hashCode() : 0);
- _hashCode =
- 31 * _hashCode +
- (getName() != null ? getName().hashCode() : 0);
- _hashCode =
- 31 * _hashCode +
- (getModifiableNotificationQueue() != null ? getModifiableNotificationQueue().hashCode() : 0);
+ _hashCode = super.hashCode();
+ _hashCode = 31 * _hashCode + (getConnection() != null ? getConnection().hashCode() : 0);
+ _hashCode = 31 * _hashCode + (getID() != null ? getID().hashCode() : 0);
return _hashCode;
}
- public void run() {
- try {
- if (!isRunning()) {
- setRunning(true);
- }
- long _heartbeatTimestamp = System.currentTimeMillis();
- while (isRunning()) {
- try {
- EmailNotification _notification =
- getModifiableNotificationQueue().
- // throws InterruptedException
- poll(30, TimeUnit.SECONDS);
- if (isNotNull(_notification)) {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.log(
- Level.FINE,
- getName() + " | " +
- "Successfully taken Notification '" + _notification + "' off the Queue. " +
- "(Queue size: '" + getModifiableNotificationQueue().size() + "')"
- );
- }
- if (LOGGER.isLoggable(Level.FINE)) {
- for (final CloudNotificationEndpoint _cloudNotificationEndpoint :
- _notification.getCloudNotificationEndpointList()) {
-
- String _batchID = (String)_cloudNotificationEndpoint.getPropertyMap().get("batchID");
- String _pushID = (String)_cloudNotificationEndpoint.getPropertyMap().get("pushID");
- if (isNotNull(_batchID) && isNotNull(_pushID)) {
- LOGGER.log(
- Level.FINE,
- "<short> | " +
- Json.createObjectBuilder().
- add("batchId", _batchID).
- add("pushId", _pushID).
- add("internalQueuePollTime", System.currentTimeMillis()).
- build().
- toString()
- );
- }
- }
- }
- try {
- getConnection().
- // throws IOException
- send(_notification);
- } catch (final IOException exception) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(
- Level.WARNING,
- getName() + " | " +
- "Failed to send Email Cloud Push Notification " +
- "to Notify-Back-URIs '" + _notification.getNotifyBackURIList() + "'. " +
- "(" +
- "Notification Payload: '" + _notification.getContent() + "', " +
- "Host: '" + getConnection().getHost() + "', " +
- "Port: '" + getConnection().getPort() + "', " +
- "Message: '" + exception.getMessage() + "'" +
- ")",
- exception
- );
- }
- }
- _heartbeatTimestamp = System.currentTimeMillis();
- } else {
- if ((System.currentTimeMillis() - _heartbeatTimestamp) >= 300000) {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.log(
- Level.FINE,
- getName() + " | " +
- "Heartbeat. " +
- "(" +
- "Max Connections: '" +
- getConnection().getNotificationProvider().getMaxConnections() +
- "'" +
- ")"
- );
- }
- _heartbeatTimestamp = System.currentTimeMillis();
- }
- }
- } catch (final InterruptedException exception) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(
- Level.WARNING,
- getName() + " | " +
- "Consumer '" + this + "' interrupted. Stopping..."
- );
- }
- stop();
- }
- }
- } catch (final Throwable throwable) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(
- Level.WARNING,
- getName() + " | " +
- "An error occurred while trying to consume an Email notification from the queue.",
- throwable
- );
- }
- }
- }
-
- public void stop() {
- if (isRunning()) {
- setRunning(false);
- }
- }
-
@Override
public String toString() {
- return
- new StringBuilder().
- append("EmailNotificationConsumer[").
- append(classMembersToString()).
- append("]").
- toString();
+ return "EmailNotificationConsumer[" + classMembersToString() + "]";
}
protected String classMembersToString() {
+ String _string = super.classMembersToString();
+ return
+ isNotEmpty(_string) ? _string + ", " : "" +
+ "connection: '" + getConnection() + "', " +
+ "id: '" + getID() + "'";
+ }
+
+ protected EmailNotification constructNotification(final NotificationData notificationData) {
+ Content _content = getContent(notificationData.getCategoryToPropertyMap());
return
- new StringBuilder().
- append("connection: '").append(getConnection()).append("', ").
- append("id: '").append(getID()).append("', ").
- append("name: '").append(getName()).append("', ").
- append("notificationQueue: '").append(getModifiableNotificationQueue()).append("'").
- toString();
+ new EmailNotification(
+ notificationData.getNotificationEndpoint(),
+ (String)
+ getProperty(
+ notificationData.getCategoryToPropertyMap(),
+ EmailNotificationProvider.EmailProperty.Name.PERSONAL
+ ),
+ (String)
+ getProperty(
+ notificationData.getCategoryToPropertyMap(),
+ EmailNotificationProvider.EmailProperty.Name.SUBJECT
+ ),
+ _content.getMessage(),
+ _content.getContentType()
+ );
}
protected EmailConnection getConnection() {
return connection;
}
+ protected Content getContent(final Map<NotificationProvider.Category, Map<String, Object>> categoryToPropertyMap) {
+ Content _content = new Content();
+ String _template =
+ (String)getProperty(categoryToPropertyMap, EmailNotificationProvider.EmailProperty.Name.TEMPLATE);
+ if (isNotNullAndIsNotEmpty(_template)) {
+ try {
+ URLConnection _urlConnection = new URL(_template).openConnection();
+ BufferedReader _in = new BufferedReader(new InputStreamReader(_urlConnection.getInputStream()));
+ StringBuilder _entityBody = new StringBuilder();
+ String _line;
+ while ((_line = _in.readLine()) != null) {
+ _entityBody.append(_line);
+ }
+ _in.close();
+ _content.
+ setMessage(
+ _entityBody.toString().
+ replace(
+ "{{subject}}",
+ (String)
+ getProperty(
+ categoryToPropertyMap,
+ EmailNotificationProvider.EmailProperty.Name.SUBJECT
+ )
+ ).
+ replace(
+ "{{detail}}",
+ (String)
+ getProperty(
+ categoryToPropertyMap,
+ EmailNotificationProvider.EmailProperty.Name.DETAIL
+ )
+ ).
+ replace(
+ "{{priority}}",
+ (String)
+ getValue(
+ getProperty(
+ categoryToPropertyMap,
+ EmailNotificationProvider.EmailProperty.Name.PRIORITY
+ ),
+ "default"
+ )
+ ).
+ replace(
+ "{{time}}",
+ (String)
+ getValue(
+ getProperty(
+ categoryToPropertyMap,
+ EmailNotificationProvider.EmailProperty.Name.TIME
+ ),
+ DATE_FORMAT.format(new Date(System.currentTimeMillis()))
+ )
+ ).
+ replace(
+ "{{url}}",
+ (String)
+ getProperty(
+ categoryToPropertyMap,
+ EmailNotificationProvider.EmailProperty.Name.URL
+ )
+ )
+ );
+ _content.
+ setContentType(
+ (String)
+ getProperty(
+ categoryToPropertyMap,
+ EmailNotificationProvider.EmailProperty.Name.CONTENT_TYPE,
+ "text/html"
+ )
+ );
+ } catch (final MalformedURLException exception) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(
+ Level.WARNING,
+ "Malformed URL for Template: '" + _template + "'. Reverting to plain text format."
+ );
+ }
+ } catch (final IOException exception) {
+ if (LOGGER.isLoggable(Level.WARNING)) {
+ LOGGER.log(
+ Level.WARNING,
+ "An I/O error occurred: '" + exception.getMessage() + "'. Reverting to plain text format."
+ );
+ }
+ }
+ }
+ if (isNullOrIsEmpty(_content.getMessage())) {
+ _content.
+ setContentType(
+ (String) getProperty(categoryToPropertyMap, EmailNotificationProvider.EmailProperty.Name.CONTENT_TYPE, "text/plain")
+ );
+ if (_content.getContentType().equals("text/html")) {
+ _content.setContentType(_content.getContentType() + "; charset=utf-8");
+ }
+ if (_content.getContentType().equals("text/plain")) {
+ _content.
+ setMessage(
+ new StringBuilder().
+ append(getProperty(categoryToPropertyMap, EmailNotificationProvider.EmailProperty.Name.DETAIL)).
+ toString()
+ );
+ } else if (_content.getContentType().startsWith("text/html")) {
+ _content.
+ setMessage(
+ new StringBuffer().
+ append(getProperty(categoryToPropertyMap, EmailNotificationProvider.EmailProperty.Name.DETAIL)).
+ toString()
+ );
+ } else {
+ // TODO: Log a warning?
+ }
+ }
+ return _content;
+ }
+
protected String getID() {
return id;
}
- protected BlockingQueue<EmailNotification> getModifiableNotificationQueue() {
- return notificationQueue;
- }
+ private static class Content {
+ private String contentType;
+ private String message;
- protected boolean isRunning() {
- return running;
- }
+ private Content() {
+ // Do nothing.
+ }
+
+ private String getContentType() {
+ return contentType;
+ }
+
+ private String getMessage() {
+ return message;
+ }
- protected void setRunning(final boolean running) {
- if (isNotEqual(isRunning(), running)) {
- this.running = running;
+ private void setContentType(final String contentType) {
+ if (isNotEqual(getContentType(), contentType)) {
+ this.contentType = contentType;
+ }
+ }
+
+ private void setMessage(final String message) {
+ if (isNotEqual(getMessage(), message)) {
+ this.message = message;
+ }
}
}
}
|