- itemsIterator = persistedEntryItems.iterator();
+ while (itemsIterator.hasNext()) {
+ Item item = itemsIterator.next();
+ persistedEntriesObject.add(item.toJSONPretty());
+ }
+ return persistedEntriesObject;
+ }
+
+ private void incrementCounterForFeed(String feedName) {
+
+ if (!counterMap.containsKey(feedName)) {
+ synchronized (counterMap) {
+ if (!counterMap.containsKey(feedName)) {
+ Counter counter = Metrics.newCounter(DynamoDBFeedPublisher
+ .class, "entries-created-for-" + feedName);
+ counterMap.put(feedName, counter);
+ }
+ }
+ }
+
+ counterMap.get(feedName).inc();
+ }
+
+ /**
+ * Sets the date in String format as we save the date in string in dynamodb.
+ *
+ * @param date Date Object to be formatted in string.
+ * @return
+ */
+ private String getDateFormatInString(Date date) {
+ Format formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ return formatter.format(date);
+ }
+
+ private static String getDateFormatInStringAWS(Date date) {
+ Format formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ return formatter.format(date);
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/adapter/DynamoDBFeedSource.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/adapter/DynamoDBFeedSource.java
new file mode 100644
index 000000000..a45677b2c
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/adapter/DynamoDBFeedSource.java
@@ -0,0 +1,966 @@
+package org.atomhopper.dynamodb.adapter;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
+import com.amazonaws.services.dynamodbv2.document.*;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.TimerContext;
+import org.apache.abdera.Abdera;
+import org.apache.abdera.model.Document;
+import org.apache.abdera.model.Entry;
+import org.apache.abdera.model.Feed;
+import org.apache.abdera.model.Link;
+import org.apache.commons.lang.StringUtils;
+import org.atomhopper.adapter.*;
+import org.atomhopper.adapter.request.adapter.GetEntryRequest;
+import org.atomhopper.adapter.request.adapter.GetFeedRequest;
+import org.atomhopper.dbal.PageDirection;
+import org.atomhopper.dynamodb.constant.DynamoDBConstant;
+import org.atomhopper.dynamodb.model.PersistedEntry;
+import org.atomhopper.dynamodb.query.JsonUtil;
+import org.atomhopper.dynamodb.query.SQLToNoSqlConverter;
+import org.atomhopper.dynamodb.query.SearchType;
+import org.atomhopper.dynamodb.query.DynamoDBQueryBuilder;
+import org.atomhopper.response.AdapterResponse;
+import org.atomhopper.util.uri.template.EnumKeyedTemplateParameters;
+import org.atomhopper.util.uri.template.URITemplate;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.StringReader;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.abdera.i18n.text.UrlEncoding.decode;
+
+/**
+ * @author shub6691
+ * Implements the DynamoDBFeedSource interface for retrieving feed entries from a datastore. This class implements
+ * the following:
+ *
+ *
+ * - Generating the feed entries from PersistedEntry instances
+ * - Accessing data from a dynamodb table where all categories are treated equally
+ * - Read categories with predefined prefixes from specified columns for better search performance
+ *
+ *
+ * Mapping category prefixes to postgres columns is done through the following:
+ *
+ * - PrefixColumnMap - maps a prefix key to a column name. E.g., 'tid' to 'tenantid'
+ * - Delimiter - used to extract the prefix from a category. E.g., if the delimiter is ':' the category
+ * value would be 'tid:1234'
+ *
+ */
+public class DynamoDBFeedSource implements FeedSource {
+
+ static Logger LOG = LoggerFactory.getLogger(DynamoDBFeedSource.class);
+ private AmazonDynamoDBClient amazonDynamoDBClient;
+ private DynamoDB dynamoDB;
+ private DynamoDBMapper mapper;
+ private boolean enableTimers = false;
+ private boolean enableLoggingOnShortPage = false;
+ private int feedHeadDelayInSeconds = 2;
+
+ private Map mapPrefix = new HashMap();
+ private Map mapColumn = new HashMap();
+
+ private String split;
+
+ private AdapterHelper helper = new AdapterHelper();
+
+ private SQLToNoSqlConverter getSearchToSqlConverter() {
+
+ return new SQLToNoSqlConverter(mapPrefix, split);
+ }
+
+ public Boolean getEnableLoggingOnShortPage() {
+ return enableLoggingOnShortPage;
+ }
+
+ public void setPrefixColumnMap(Map prefix) {
+
+ mapPrefix = new HashMap(prefix);
+
+ mapColumn = new HashMap();
+
+ for (String key : mapPrefix.keySet()) {
+
+ mapColumn.put(mapPrefix.get(key), key);
+ }
+ }
+
+ public void setDelimiter(String splitParam) {
+
+ split = splitParam;
+ }
+
+ @NotImplemented
+ public void setParameters(Map params) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ public void afterPropertiesSet() {
+
+ if (split != null ^ !mapPrefix.isEmpty()) {
+
+ throw new IllegalArgumentException("The 'delimiter' and 'prefixColumnMap' field must both be defined");
+ }
+ }
+
+
+ public DynamoDBFeedSource(AmazonDynamoDBClient amazonDynamoDBClient) {
+ this.amazonDynamoDBClient = amazonDynamoDBClient;
+ this.dynamoDB = new DynamoDB(this.amazonDynamoDBClient);
+ this.mapper = new DynamoDBMapper(amazonDynamoDBClient);
+ setDynamoDB(dynamoDB);
+ }
+
+ public void setDynamoDB(DynamoDB dynamoDB) {
+ this.dynamoDB = dynamoDB;
+ }
+
+ /**
+ * This method is used to return the categories based search with the help of feed and entryId and search type is backward
+ * whch means it will search for date less the markerDate along with other params.
+ *
+ * @param feedName: Name of the feed for each entry . For ex: namespace/feed
+ * @param markerTimestamp: Timestamp for which the search is to be performed for category
+ * @param markerId: EntryID for every event.
+ * @param searchString: The string on which search is performed in db
+ * @param pageSize: default size is 25
+ * @return List of PersistedEntry data found based on above params from db.
+ */
+ public List getFeedBackward(String feedName,
+ String markerTimestamp,
+ String markerId,
+ String searchString,
+ int pageSize) {
+ List feedPage;
+ DynamoDBQueryBuilder sqlBac = new DynamoDBQueryBuilder(getSearchToSqlConverter()).searchString(searchString);
+ sqlBac.searchType(SearchType.FEED_BACKWARD);
+ //Dynamodb query implementation
+ Map map = new HashMap();
+ String filters = sqlBac.getFilters(map);
+ String feedNameFilter = "feed = :feedName and ";
+ // SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ // dateFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String dateLastUpdated = markerTimestamp;
+ Map valueMap = new HashMap();
+ valueMap.put(":id", new AttributeValue().withS(String.valueOf(markerId)));
+ valueMap.put(":dateLastUpdated", new AttributeValue().withS(dateLastUpdated));
+ valueMap.put(":feedName", new AttributeValue().withS(feedName));
+ for (Map.Entry res : map.entrySet()) {
+ valueMap.put(res.getKey(), new AttributeValue().withS(res.getValue()));
+ }
+ DynamoDBQueryExpression querySpec = new DynamoDBQueryExpression()
+ .withKeyConditionExpression("entryId = :id and dateLastUpdated <= :dateLastUpdated")
+ .withScanIndexForward(false)
+ .withLimit(pageSize).addExpressionAttributeNamesEntry(markerId, dateLastUpdated)
+ .withFilterExpression(feedNameFilter + filters)
+ .withExpressionAttributeValues(valueMap);
+ feedPage = mapper.query(PersistedEntry.class, querySpec);
+ return feedPage;
+ }
+
+
+ /**
+ * This method is used to return the categories based search with the help of feed and entryId and search type is forward
+ * whch means it will search for date less the markerDate along with other params.
+ *
+ * @param feedName: Name of the feed for each entry . For ex: namespace/feed
+ * @param markerTimestamp: Timestamp for which the search is to be performed for category
+ * @param markerId: EntryID for every event.
+ * @param searchString: The string on which search is performed in db
+ * @param pageSize: default size is 25
+ * @return List of PersistedEntry data found based on above params from db.
+ */
+ public List getFeedForward(String feedName,
+ String markerTimestamp,
+ String markerId,
+ String searchString,
+ int pageSize) {
+ List feedPage;
+ DynamoDBQueryBuilder sqlBac = new DynamoDBQueryBuilder(getSearchToSqlConverter()).searchString(searchString);
+ sqlBac.searchType(SearchType.FEED_FORWARD);
+ //Dynamodb query implementation
+ Map map = new HashMap();
+ String filters = sqlBac.getFilters(map);
+
+ // SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ // dateFormatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ String dateLastUpdated = markerTimestamp;
+ Map valueMap = new HashMap();
+ valueMap.put(":id", new AttributeValue().withS(String.valueOf(markerId)));
+ valueMap.put(":dateLastUpdated", new AttributeValue().withS(dateLastUpdated));
+ valueMap.put(":feedName", new AttributeValue().withS(feedName));
+ for (Map.Entry res : map.entrySet()) {
+ valueMap.put(res.getKey(), new AttributeValue().withS(res.getValue()));
+ }
+ DynamoDBQueryExpression querySpec;
+ if(null != filters){
+ String feedNameFilter = "feed= :feedName and ";
+ querySpec = new DynamoDBQueryExpression()
+ .withKeyConditionExpression("entryId = :id and dateLastUpdated >= :dateLastUpdated")
+ .withScanIndexForward(true)
+ .withLimit(pageSize)
+ .withFilterExpression(feedNameFilter + filters)
+ .withExpressionAttributeValues(valueMap);
+ }else{
+ String feedNameFilter = "feed= :feedName";
+ querySpec = new DynamoDBQueryExpression()
+ .withKeyConditionExpression("entryId = :id and dateLastUpdated >= :dateLastUpdated")
+ .withScanIndexForward(true)
+ .withLimit(pageSize)
+ .withFilterExpression(feedNameFilter)
+ .withExpressionAttributeValues(valueMap);
+ }
+ feedPage = mapper.query(PersistedEntry.class, querySpec);
+ int abc = feedPage.size();
+ return feedPage;
+ }
+
+ /**
+ * This method is used to return list of entries from DynamoDb bases on feedName and timestamp.
+ *
+ * @param getFeedRequest : It contains the request object parameters
+ * @param startingAt : The timestamp from where we need to perform search
+ * @param pageSize : for pagination . Default page size is 25
+ * @return List of entries found based on given timestamp ,feed name.
+ */
+ //TODO LINK REF NEED TO BE IMPLEMENTED FOR HYDRATED FEED.
+ public AdapterResponse getFeedPageByTimestamp(GetFeedRequest getFeedRequest, String startingAt, int pageSize) throws Exception {
+ final String pageDirectionValue = getFeedRequest.getDirection();
+ ObjectMapper mapper = new ObjectMapper();
+ PageDirection pageDirection = PageDirection.FORWARD;
+ if (StringUtils.isNotEmpty(String.valueOf(pageDirection))) {
+ pageDirection = PageDirection.valueOf(pageDirectionValue.toUpperCase());
+ }
+ final String searchString = getFeedRequest.getSearchQuery() != null ? getFeedRequest.getSearchQuery() : "";
+ DateTimeFormatter isoDTF = ISODateTimeFormat.dateTime();
+ DateTime startAt = isoDTF.parseDateTime(startingAt);
+ List entryMarker = getEntryByTimestamp(startAt, getFeedRequest.getFeedName(), pageDirection);
+ if (entryMarker.isEmpty()) {
+ throw new RuntimeException("No entry with specified startingAt timestamp found");
+ }
+ // This list contains the persistent object in json string format
+ PersistedEntry persistedEntry = mapper.readValue(entryMarker.get(0), PersistedEntry.class);
+ //Convert String to Date Format
+ String lastDateUpdated = persistedEntry.getDateLastUpdated();
+ final Feed feed = hydrateFeed(getFeedRequest.getAbdera(),
+ enhancedGetFeedPage(getFeedRequest.getFeedName(),
+ lastDateUpdated,
+ persistedEntry.getEntryId(),
+ pageDirection,
+ searchString, pageSize),
+ getFeedRequest, pageSize);
+ return ResponseBuilder.found(feed);
+ }
+
+
+ /**
+ * This method creates the query for fetching the data based on timestamp and the direction from DynamoDB
+ *
+ * @param
+ * @param markerDate: StartAt Must be in ISO 8601 Date and Time format, and must contain a time zone,
+ * for example: 2014-03-10T06:00:00.000Z. For more information, see ISO 8601 Date and Time format.
+ * @param direction: Specifies the direction from which to return entries, starting from the current marker or entry.
+ * Can be either forward or backward.
+ * @return List of data present based on the search query.
+ */
+ protected List getEntryByTimestamp(final DateTime markerDate, final String feed, PageDirection direction) {
+ ValueMap valueMap = new ValueMap();
+ valueMap.withString(":feed", feed);
+ valueMap.withString(":dateLastUpdated", String.valueOf(markerDate));
+ return getQueryBuilderMethod(dynamoDB, "feed = :feed and " + getTimeStampValueFilter(direction) , null, valueMap);
+ }
+
+ /**
+ * This method is used to create a query for dynamodb for timestamp search based on direction
+ *
+ * @param direction: If the startingAt parameter is used without a direction parameter, then the forward direction is assumed.
+ * If you want to fetch feeds from a time period before the time specified in the time stamp,
+ * you need to use the direction parameter and then the backward description, like the following: direction set to backward.
+ * @return : the formed query based upon the direction.
+ */
+ private String getTimeStampValueFilter(PageDirection direction) {
+ if (direction.equals(PageDirection.BACKWARD)) {
+ return "dateLastUpdated <= :dateLastUpdated";
+ } else {
+ return "dateLastUpdated > :dateLastUpdated";
+ }
+ }
+
+ @Override
+ public FeedInformation getFeedInformation() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ /**
+ * This method returns the search result based on the type of operation like search getFeedHead,getFeedPageByTimestamp ect
+ *
+ * @param getFeedRequest: all the request type properties mentioned below ;
+ * List getCategories();
+ * String getSearchQuery();
+ * String getPageMarker();
+ * String getPageSize();
+ * String getDirection();
+ * String getStartingAt();
+ * @return
+ */
+ @Override
+ public AdapterResponse getFeed(GetFeedRequest getFeedRequest) {
+ AdapterResponse response = null;
+ TimerContext context = null;
+ int pageSize = DynamoDBConstant.PAGE_SIZE;
+ final String pageSizeString = getFeedRequest.getPageSize();
+ if (StringUtils.isNotBlank(pageSizeString)) {
+ pageSize = Integer.parseInt(pageSizeString);
+ }
+ final String marker = getFeedRequest.getPageMarker();
+ final String startingAt = getFeedRequest.getStartingAt();
+ if (StringUtils.isNotBlank(marker) && StringUtils.isNotBlank(startingAt)) {
+ response = ResponseBuilder.badRequest("'marker' parameter can not be used together with the 'startingAt' parameter");
+ return response;
+ }
+
+ try {
+
+ if (StringUtils.isBlank(marker) && StringUtils.isBlank(startingAt)) {
+
+ context = startTimer(String.format("get-feed-head-%s", getMetricBucketForPageSize(pageSize)));
+ response = getFeedHead(getFeedRequest, pageSize);
+ } else if (StringUtils.isNotBlank(marker) && marker.equals(DynamoDBConstant.MOCK_LAST_MARKER)) {
+
+ context = startTimer(String.format("get-last-page-%s", getMetricBucketForPageSize(pageSize)));
+ response = getLastPage(getFeedRequest, pageSize);
+ } else if (StringUtils.isNotBlank(marker)) {
+ context = startTimer(String.format("get-feed-page-%s", getMetricBucketForPageSize(pageSize)));
+ response = getFeedPage(getFeedRequest, marker, pageSize);
+ } else {
+ // we process 'startingAt' parameter here
+ context = startTimer(String.format("get-feed-page-startingAt-%s", getMetricBucketForPageSize(pageSize)));
+ response = getFeedPageByTimestamp(getFeedRequest, startingAt, pageSize);
+ }
+ } catch (IllegalArgumentException iae) {
+ response = ResponseBuilder.badRequest(iae.getMessage());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ stopTimer(context);
+ }
+
+ return response;
+ }
+
+ @Override
+ public AdapterResponse getEntry(GetEntryRequest getEntryRequest) {
+ final List entry = getEntry(getEntryRequest.getEntryId(), getEntryRequest.getFeedName());
+
+ AdapterResponse response = ResponseBuilder.notFound();
+
+ if (!entry.isEmpty()) {
+ response = ResponseBuilder.found(hydrateEntry(entry.get(0), getEntryRequest.getAbdera()));
+ }
+ return response;
+ }
+
+ /**
+ * This method returns the feed page based on the entryId for every event
+ *
+ * @param getFeedRequest
+ * @param marker: entry id fo every feed
+ * @param pageSize: default is 25,for pagination
+ * @throws ParseException
+ * @return: Response as a Http response
+ */
+
+ private AdapterResponse getFeedPage(GetFeedRequest getFeedRequest, String marker, int pageSize) throws ParseException {
+ final String pageDirectionValue = getFeedRequest.getDirection();
+ PageDirection pageDirection = PageDirection.FORWARD;
+ if (StringUtils.isNotEmpty(pageDirectionValue)) {
+ pageDirection = PageDirection.valueOf(pageDirectionValue.toUpperCase());
+ }
+ final String searchString = getFeedRequest.getSearchQuery() != null ? getFeedRequest.getSearchQuery() : "";
+ List entryMarker = getEntry(marker, getFeedRequest.getFeedName());
+ if (entryMarker.isEmpty()) {
+ return ResponseBuilder.notFound("No entry with specified marker found");
+ }
+ String lastDateUpdated = entryMarker.get(0).getDateLastUpdated();
+ final Feed feed = hydrateFeed(getFeedRequest.getAbdera(),
+ enhancedGetFeedPage(getFeedRequest.getFeedName(),
+ lastDateUpdated,
+ entryMarker.get(0).getEntryId(),
+ pageDirection,
+ searchString, pageSize),
+ getFeedRequest, pageSize);
+ return ResponseBuilder.found(feed);
+ }
+
+ /**
+ * This method is used to return the feed based on direction weather is a forward or backward
+ *
+ * @param feedName: name of feed
+ * @param markerTimestamp: for which time stamp it need to be searched
+ * @param markerId: entry id for every feed
+ * @param direction: it can be forward or backward
+ * @param searchString: category search string
+ * @param pageSize: default is 25 ,used for pagination
+ * @return: list of PersistedEntry object .
+ */
+ private List enhancedGetFeedPage(final String feedName, final String markerTimestamp,
+ final String markerId,
+ final PageDirection direction, final String searchString,
+ final int pageSize) {
+ List feedPage = new LinkedList();
+
+ TimerContext context = null;
+
+ boolean hasCats = !searchString.trim().isEmpty();
+
+ try {
+ switch (direction) {
+ case FORWARD:
+
+
+ if (hasCats) {
+ context = startTimer(String.format("db-get-feed-page-forward-with-cats-%s",
+ getMetricBucketForPageSize(pageSize)));
+ } else {
+ context = startTimer(
+ String.format("db-get-feed-page-forward-%s", getMetricBucketForPageSize(pageSize)));
+ }
+ feedPage = getFeedForward(feedName,
+ markerTimestamp,
+ markerId,
+ searchString,
+ pageSize);
+
+ Collections.reverse(feedPage);
+ break;
+
+ case BACKWARD:
+
+
+ if (hasCats) {
+ context = startTimer(String.format("db-get-feed-page-backward-with-cats-%s",
+ getMetricBucketForPageSize(pageSize)));
+ } else {
+ context = startTimer(
+ String.format("db-get-feed-page-backward-%s", getMetricBucketForPageSize(pageSize)));
+ }
+ feedPage = getFeedBackward(feedName,
+ markerTimestamp,
+ markerId,
+ searchString,
+ pageSize);
+ break;
+ }
+ } finally {
+ stopTimer(context);
+ }
+
+ return feedPage;
+ }
+
+ /**
+ * This method is used to return the last page of the particular feed based on entryID
+ *
+ * @param getFeedRequest: Its has all required feed request objects
+ * @param pageSize: Page size for pagination ,default size is 25
+ * @return
+ */
+
+ private AdapterResponse getLastPage(GetFeedRequest getFeedRequest, int pageSize) {
+
+ final String searchString = getFeedRequest.getSearchQuery() != null ? getFeedRequest.getSearchQuery() : "";
+ AdapterResponse response;
+
+ final Feed feed = hydrateFeed(getFeedRequest.getAbdera(),
+ enhancedGetLastPage(getFeedRequest.getFeedName(), pageSize, searchString),
+ getFeedRequest, pageSize);
+ response = ResponseBuilder.found(feed);
+
+ return response;
+ }
+
+ /**
+ * This method returns the Last Page of the feed by performing union of both the select statements output
+ *
+ * @param feedName: FeedName
+ * @param pageSize: for pagination
+ * @param searchString: Search Category to be passed
+ * @return List of persistent Object for union results
+ */
+ private List enhancedGetLastPage(final String feedName, final int pageSize,
+ final String searchString) {
+
+ List categoriesList = getSearchToSqlConverter().getParamsFromSearchString(searchString);
+ int numCats = categoriesList.size();
+
+ String filterExpression = null;
+
+ if (numCats > 0) {
+ filterExpression = "(contains(categories, :categories))";
+ }
+
+ TimerContext context = null;
+ try {
+ if (numCats > 0) {
+ context = startTimer(
+ String.format("db-get-last-page-with-cats-%s", getMetricBucketForPageSize(pageSize)));
+ } else {
+ context = startTimer(String.format("db-get-last-page-%s", getMetricBucketForPageSize(pageSize)));
+ }
+
+ List feedPage;
+ ValueMap valueMap = new ValueMap();
+ valueMap.withString(":feed", feedName);
+ feedPage = getQueryBuilderMethod(dynamoDB, "feed = :feed and dateLastUpdated < :dateLastUpdated" ,filterExpression, pageSize, valueMap, true);
+ // comparator is written to perform sorting based on if two dates are equal then sort output based on entryId in desc order
+ List persistedEntryList = JsonUtil.getPersistenceEntity(feedPage);
+ Collections.sort(persistedEntryList, (a, b) -> {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ String dateLastUpdated = dateFormatter.format(a.getDateLastUpdated());
+ String dateLastUpdatedNextDate = dateFormatter.format(b.getDateLastUpdated());
+ if (dateLastUpdatedNextDate.equals(dateLastUpdated)) {
+ return b.getEntryId().compareTo(a.getEntryId());
+ }
+ return -1;
+ });
+
+ } finally {
+ stopTimer(context);
+ }
+ return null;
+ }
+
+ /**
+ * @param getFeedRequest
+ * @param pageSize
+ * @return
+ */
+ private AdapterResponse getFeedHead(GetFeedRequest getFeedRequest, int pageSize) {
+ final Abdera abdera = getFeedRequest.getAbdera();
+
+ final String searchString = getFeedRequest.getSearchQuery() != null ? getFeedRequest.getSearchQuery() : "";
+
+ List persistedEntries = getFeedHead(getFeedRequest.getFeedName(), pageSize, searchString);
+
+ Feed hydratedFeed = hydrateFeed(abdera, persistedEntries, getFeedRequest, pageSize);
+
+ // Set the last link in the feed head
+ final String baseFeedUri = decode(getFeedRequest.urlFor(
+ new EnumKeyedTemplateParameters(URITemplate.FEED)));
+
+ if (!helper.isArchived()) {
+
+ hydratedFeed.addLink(
+ new StringBuilder().append(baseFeedUri)
+ .append(DynamoDBConstant.MARKER_EQ).append(DynamoDBConstant.MOCK_LAST_MARKER)
+ .append(DynamoDBConstant.AND_LIMIT_EQ).append(String.valueOf(pageSize))
+ .append(DynamoDBConstant.AND_SEARCH_EQ).append(urlEncode(searchString))
+ .append(DynamoDBConstant.AND_DIRECTION_EQ_BACKWARD).toString())
+ .setRel(Link.REL_LAST);
+ }
+
+ return ResponseBuilder.found(hydratedFeed);
+
+ }
+
+ private String getMetricBucketForPageSize(int pageSize) {
+ if (pageSize > 0 && pageSize <= 249) {
+ return "tiny";
+ } else if (pageSize >= 250 && pageSize <= 499) {
+ return "small";
+ } else if (pageSize >= 500 && pageSize <= 749) {
+ return "medium";
+ } else {
+ // 750 - 1000
+ return "large";
+ }
+ }
+
+ /**
+ * This method is used to return the feed head which means the starting feed for the particular entry
+ *
+ * @param feedName: name of the feed
+ * @param pageSize: default is 25, for pagination
+ * @param searchString: Search string to be used to filteration in the query
+ * @return List of all the output of PersistedEntry
+ */
+ private List getFeedHead(final String feedName, final int pageSize, final String searchString) {
+ List categoriesList = getSearchToSqlConverter().getParamsFromSearchString(searchString);
+ int numCats = categoriesList.size();
+
+
+ String filterExpression = null;
+
+ if (numCats > 0) {
+ filterExpression = "(contains(categories, :categories))";
+ }
+
+ TimerContext context = null;
+ try {
+ if (numCats > 0) {
+ context = startTimer(
+ String.format("db-get-feed-head-with-cats-%s", getMetricBucketForPageSize(pageSize)));
+ } else {
+ context = startTimer(String.format("db-get-feed-head-%s", getMetricBucketForPageSize(pageSize)));
+ }
+
+ List feedPage;
+ ValueMap valueMap = new ValueMap();
+ valueMap.withString(":feed", feedName);
+ for(String s: categoriesList){
+ valueMap.withString(":categories", s);
+ }
+ feedPage = getQueryBuilderMethod(dynamoDB, "feed = :feed",filterExpression, pageSize, valueMap, false);
+ List persistedEntryList = JsonUtil.getPersistenceEntity(feedPage);
+ return persistedEntryList;
+ } finally {
+ stopTimer(context);
+ }
+ }
+
+ @Override
+ public void setCurrentUrl(URL urlCurrent) {
+ helper.setCurrentUrl(urlCurrent);
+ }
+
+ @Override
+ public void setArchiveUrl(URL url) {
+ helper.setArchiveUrl(url);
+ }
+
+ private TimerContext startTimer(String name) {
+ if (enableTimers) {
+ final com.yammer.metrics.core.Timer timer = Metrics.newTimer(getClass(), name, TimeUnit.MILLISECONDS,
+ TimeUnit.SECONDS);
+ TimerContext context = timer.time();
+ return context;
+ } else {
+ return null;
+ }
+
+
+ }
+
+ private Feed hydrateFeed(Abdera abdera, List persistedEntries,
+ GetFeedRequest getFeedRequest, final int pageSize) {
+ final Feed hydratedFeed = abdera.newFeed();
+ final String baseFeedUri = decode(getFeedRequest.urlFor(
+ new EnumKeyedTemplateParameters(URITemplate.FEED)));
+ final String searchString = getFeedRequest.getSearchQuery() != null ? getFeedRequest.getSearchQuery() : "";
+ if (helper.isArchived()) {
+
+ helper.addArchiveNode(hydratedFeed);
+ }
+
+ // Set the feed links
+ addFeedCurrentLink(hydratedFeed, baseFeedUri);
+ addFeedSelfLink(hydratedFeed, baseFeedUri, getFeedRequest, pageSize, searchString);
+
+
+ PersistedEntry nextEntry = null;
+
+ // TODO: We should have a link builder method for these
+ if (!(persistedEntries.isEmpty())) {
+ hydratedFeed.setId(DynamoDBConstant.UUID_URI_SCHEME + UUID.randomUUID().toString());
+ hydratedFeed.setTitle(persistedEntries.get(0).getFeed());
+
+ // Set the previous link
+ hydratedFeed.addLink(new StringBuilder()
+ .append(baseFeedUri).append(DynamoDBConstant.MARKER_EQ)
+ .append(persistedEntries.get(0).getEntryId())
+ .append(DynamoDBConstant.AND_LIMIT_EQ).append(String.valueOf(pageSize))
+ .append(DynamoDBConstant.AND_SEARCH_EQ).append(urlEncode(searchString))
+ .append(DynamoDBConstant.AND_DIRECTION_EQ_FORWARD).toString())
+ .setRel(helper.getPrevLink());
+
+ final PersistedEntry lastEntryInCollection = persistedEntries.get(persistedEntries.size() - 1);
+
+ nextEntry = getNextMarker(lastEntryInCollection, getFeedRequest.getFeedName(), searchString);
+
+ if (nextEntry != null) {
+ // Set the next link
+ hydratedFeed.addLink(new StringBuilder().append(baseFeedUri)
+ .append(DynamoDBConstant.MARKER_EQ).append(nextEntry.getEntryId())
+ .append(DynamoDBConstant.AND_LIMIT_EQ).append(String.valueOf(pageSize))
+ .append(DynamoDBConstant.AND_SEARCH_EQ).append(urlEncode(searchString))
+ .append(DynamoDBConstant.AND_DIRECTION_EQ_BACKWARD).toString())
+ .setRel(helper.getNextLink());
+ }
+ }
+
+ if (nextEntry == null && helper.getArchiveUrl() != null) {
+ hydratedFeed.addLink(new StringBuilder().append(helper.getArchiveUrl()).append(DynamoDBConstant.LIMIT_EQ).append(String.valueOf(pageSize))
+ .append(DynamoDBConstant.AND_DIRECTION_EQ_BACKWARD).toString())
+ .setRel(FeedSource.REL_ARCHIVE_NEXT);
+ }
+
+ for (PersistedEntry persistedFeedEntry : persistedEntries) {
+ hydratedFeed.addEntry(hydrateEntry(persistedFeedEntry, abdera));
+ }
+
+ if (getEnableLoggingOnShortPage()) {
+ if (hydratedFeed.getEntries() != null && hydratedFeed.getEntries().size() < pageSize) {
+ LOG.warn("User requested " + getFeedRequest.getFeedName() + " feed with limit " + pageSize + ", but returning only " + hydratedFeed.getEntries().size());
+ List entries = hydratedFeed.getEntries();
+ StringBuilder sb = new StringBuilder();
+ for (int idx = 0; idx < entries.size(); idx++) {
+ Entry entry = entries.get(idx);
+ sb.append(entry.getId() + ", ");
+ }
+ LOG.warn("UUIDs: " + sb.toString());
+ } else if (hydratedFeed.getEntries() == null) {
+ LOG.warn("User requested " + getFeedRequest.getFeedName() + " feed with limit " + pageSize + ", but no entries are available");
+ }
+ }
+
+ return hydratedFeed;
+ }
+
+ /**
+ * This method is used to get the next marker based for the markerID and the feedName
+ *
+ * @param persistedEntry: PersistentEntryModel
+ * @param feedName: name of the feed
+ * @param searchString: Category search string for filteration
+ * @return Persistent model Object
+ */
+ private PersistedEntry getNextMarker(final PersistedEntry persistedEntry, final String feedName,
+ final String searchString) {
+
+ List categoriesList = getSearchToSqlConverter().getParamsFromSearchString(searchString);
+ int numCats = categoriesList.size();
+
+ String filterExpression = null;
+
+ if (numCats > 0) {
+ filterExpression = "(contains(categories, :categories))";
+ }
+
+
+
+ List firstUnionPersistentList;
+ ValueMap valueMap = new ValueMap();
+ valueMap.withString(":feed", persistedEntry.getFeed());
+
+ for(String s: categoriesList){
+ if(s.charAt(0) == '{'){
+ valueMap.withString(":categories", s.substring(1,s.length() -1));
+ }else{
+ valueMap.withString(":categories", s);
+ }
+
+ }
+
+ firstUnionPersistentList = getQueryBuilderMethod(dynamoDB, "feed = :feed ",filterExpression, valueMap);
+ List persistedEntryList = JsonUtil.getPersistenceEntity(firstUnionPersistentList);
+ return persistedEntryList.get(0);
+ }
+
+
+ private void addFeedSelfLink(Feed feed, String baseFeedUri, GetFeedRequest getFeedRequest, int pageSize, String searchString) {
+ StringBuilder queryParams = new StringBuilder();
+ boolean markerIsSet = false;
+
+ queryParams.append(baseFeedUri).append(DynamoDBConstant.LIMIT_EQ).append(
+ String.valueOf(pageSize));
+
+ if (searchString.length() > 0) {
+ queryParams.append(DynamoDBConstant.AND_SEARCH_EQ).append(urlEncode(searchString));
+ }
+ if (getFeedRequest.getPageMarker() != null && getFeedRequest.getPageMarker().length() > 0) {
+ queryParams.append(DynamoDBConstant.AND_MARKER_EQ).append(getFeedRequest.getPageMarker());
+ markerIsSet = true;
+ }
+ if (markerIsSet) {
+ queryParams.append(DynamoDBConstant.AND_DIRECTION_EQ).append(getFeedRequest.getDirection());
+ } else {
+ queryParams.append(DynamoDBConstant.AND_DIRECTION_EQ_BACKWARD);
+ if (queryParams.toString().equalsIgnoreCase(
+ baseFeedUri + DynamoDBConstant.LIMIT_EQ + "25" + DynamoDBConstant.AND_DIRECTION_EQ_BACKWARD)) {
+ // They are calling the feedhead, just use the base feed uri
+ // This keeps the validator at http://validator.w3.org/ happy
+ queryParams.delete(0, queryParams.toString().length()).append(
+ baseFeedUri);
+ }
+ }
+ feed.addLink(queryParams.toString()).setRel(Link.REL_SELF);
+ }
+
+
+ private void addFeedCurrentLink(Feed hydratedFeed, String baseFeedUri) {
+ String url = helper.isArchived() ? helper.getCurrentUrl() : baseFeedUri;
+
+ hydratedFeed.addLink(url, Link.REL_CURRENT);
+ }
+
+
+ private Entry hydrateEntry(PersistedEntry persistedEntry, Abdera abderaReference) {
+
+ final Document hydratedEntryDocument = abderaReference.getParser().parse(
+ new StringReader(persistedEntry.getEntryBody()));
+
+ Entry entry = null;
+
+ if (hydratedEntryDocument != null) {
+ entry = hydratedEntryDocument.getRoot();
+ entry.setUpdated(persistedEntry.getDateLastUpdated());
+ entry.setPublished(persistedEntry.getCreationDate());
+ }
+
+ return entry;
+ }
+
+ /**
+ * @param context
+ */
+ private void stopTimer(TimerContext context) {
+ if (enableTimers && context != null) {
+ context.stop();
+ }
+ }
+
+ /**
+ * @param searchString
+ * @return
+ */
+ private String urlEncode(String searchString) {
+ try {
+ return URLEncoder.encode(searchString, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ //noop - should never get here
+ return "";
+ }
+ }
+
+ /**
+ * To get the entry from dynamodb based upon two params?
+ *
+ * @param entryId: It is the marker id for entry for every events
+ * @param feedName: feed name is used to search the records in dynamodb
+ * @return : list of entry found if that exits in dynamodb with the entryId and feedName.
+ */
+ private List getEntry(String entryId, final String feedName) {
+ Gson gson = new Gson();
+ List persistedEntriesObject = new ArrayList();
+ Table table = dynamoDB.getTable(DynamoDBConstant.ENTRIES);
+ Index index = table.getIndex(DynamoDBConstant.ENTRY_ID_FEED_INDEX);
+ QuerySpec spec = new QuerySpec()
+ .withKeyConditionExpression("entryId = :entryId and feed = :feed")
+ .withValueMap(new ValueMap()
+ .withString(":entryId", entryId)
+ .withString(":feed", feedName));
+ ItemCollection persistedEntryItems = index.query(spec);
+ Iterator- itemsIterator = persistedEntryItems.iterator();
+ while (itemsIterator.hasNext()) {
+ Item item = itemsIterator.next();
+ persistedEntriesObject.add(gson.fromJson(item.toJSONPretty(), PersistedEntry.class));
+ }
+ return persistedEntriesObject;
+ }
+
+ /**
+ * This method is used to build a query based on the condition expression(for ex with where clause)and other params
+ *
+ * @param dynamoDB: object of dynamoDb
+ * @param conditionExpression: Filtered Exppression based on which results are filtered(where clause conditions)
+ * @param pageSize: page size for pagination
+ * @param valueMap: map for keeping the mapped values passed in a condition expression
+ * @return List of String in json format.
+ */
+ public List getQueryBuilderMethod(DynamoDB dynamoDB, String conditionExpression,String filterExpression, int pageSize, ValueMap valueMap, boolean orderBy) {
+ List feedPage = new ArrayList<>();
+ Table table = dynamoDB.getTable("entries");
+ Index index = table.getIndex("global-feed-index");
+ QuerySpec spec = null;
+ if(null!= filterExpression){
+ spec = new QuerySpec()
+ .withKeyConditionExpression(conditionExpression)
+ .withFilterExpression(filterExpression)
+ .withValueMap(valueMap)
+ .withMaxPageSize(pageSize)// for no of page limit to be displayed
+ .withScanIndexForward(orderBy);
+ }else{
+ spec = new QuerySpec()
+ .withKeyConditionExpression(conditionExpression)
+ .withValueMap(valueMap)
+ .withMaxPageSize(pageSize)// for no of page limit to be displayed
+ .withScanIndexForward(orderBy);
+ }
+ // for descending order sorting on lastDateUpdated
+ ItemCollection persistedEntryItems = null;
+ try{
+ persistedEntryItems = index.query(spec);
+ }catch(Exception e){
+ LOG.error("Exception " + e + e.getMessage());
+ }
+
+ Iterator
- itemsIterator = persistedEntryItems.iterator();
+ while (itemsIterator.hasNext()) {
+ Item item = itemsIterator.next();
+ feedPage.add(item.toJSONPretty());
+ }
+ return feedPage;
+ }
+
+ /**
+ * This is a overloaded method is used to build a query based on the condition expression(for ex with where clause)and other params
+ *
+ * @param dynamoDB: object of dynamoDb
+ * @param conditionExpression: Filtered Exppression based on which results are filtered(where clause conditions)
+ * @param valueMap: map for keeping the mapped values passed in a condition expression
+ * @return List of String in json format.
+ */
+ public List getQueryBuilderMethod(DynamoDB dynamoDB, String conditionExpression,String filterExpression, ValueMap valueMap) {
+
+ List feedPage = new ArrayList<>();
+ Table table = dynamoDB.getTable("entries");
+ Index index = table.getIndex("global-feed-index");
+ QuerySpec spec;
+ if(null != filterExpression){
+ spec = new QuerySpec()
+ .withKeyConditionExpression(conditionExpression)
+ .withFilterExpression(filterExpression)
+ .withScanIndexForward(true)
+ .withValueMap(valueMap);
+ }else{
+ spec = new QuerySpec()
+ .withKeyConditionExpression(conditionExpression)
+ .withScanIndexForward(true)
+ .withValueMap(valueMap);
+ }
+
+ ItemCollection persistedEntryItems = null;
+ try{
+ persistedEntryItems = index.query(spec);
+ }catch(Exception e){
+ LOG.error("Exception : " + e + e.getMessage()) ;
+ }
+
+ Iterator
- itemsIterator = persistedEntryItems.iterator();
+ while (itemsIterator.hasNext()) {
+ Item item = itemsIterator.next();
+ feedPage.add(item.toJSONPretty());
+ }
+ return feedPage;
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/constant/DynamoDBConstant.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/constant/DynamoDBConstant.java
new file mode 100644
index 000000000..5dc674ee7
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/constant/DynamoDBConstant.java
@@ -0,0 +1,26 @@
+package org.atomhopper.dynamodb.constant;
+
+/**
+ * Constant file for all the constant declared in the module.
+ */
+public final class DynamoDBConstant {
+ public static final String UUID_URI_SCHEME = "urn:uuid:";
+ public static final String LINK_REL_SELF = "self";
+ public static final String ENTRIES = "entries";
+ public static final String ENTRY_ID_FEED_INDEX = "entryId-feed-index";
+ public static final String ENTRY_ID = "entryId";
+ public static final String FEED = "feed";
+ public static final int PAGE_SIZE=25;
+ public static final String MOCK_LAST_MARKER = "last";
+ public static final String MARKER_EQ = "?marker=";
+ public static final String LIMIT_EQ = "?limit=";
+ public static final String AND_SEARCH_EQ = "&search=";
+ public static final String AND_LIMIT_EQ = "&limit=";
+ public static final String AND_MARKER_EQ = "&marker=";
+ public static final String AND_DIRECTION_EQ = "&direction=";
+ public static final String AND_DIRECTION_EQ_BACKWARD = "&direction=backward";
+ public static final String AND_DIRECTION_EQ_FORWARD = "&direction=forward";
+ public static final String DATE_LAST_UPDATED="dateLastUpdated";
+ private DynamoDBConstant() {
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/model/PersistedEntry.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/model/PersistedEntry.java
new file mode 100644
index 000000000..413aaf1ac
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/model/PersistedEntry.java
@@ -0,0 +1,74 @@
+package org.atomhopper.dynamodb.model;
+
+import com.amazonaws.services.dynamodbv2.datamodeling.*;
+import org.atomhopper.dynamodb.constant.DynamoDBConstant;
+
+import java.util.*;
+
+
+@DynamoDBTable(tableName = DynamoDBConstant.ENTRIES)
+public class PersistedEntry {
+
+ @DynamoDBHashKey(attributeName = DynamoDBConstant.ENTRY_ID)
+ private String entryId;
+ @DynamoDBIndexRangeKey(attributeName = DynamoDBConstant.FEED, localSecondaryIndexName = "entryId-feed-index")
+ @DynamoDBIndexHashKey(globalSecondaryIndexName = "global-feed-index", attributeName = DynamoDBConstant.FEED)
+ private String feed;
+ private String entryBody;
+ @DynamoDBAutoGeneratedTimestamp(strategy = DynamoDBAutoGenerateStrategy.CREATE)
+ private String creationDate;
+ @DynamoDBIndexRangeKey(globalSecondaryIndexName = "global-feed-index", attributeName = DynamoDBConstant.DATE_LAST_UPDATED)
+ @DynamoDBRangeKey(attributeName = DynamoDBConstant.DATE_LAST_UPDATED)
+ private String dateLastUpdated;
+ private List categories;
+
+ public String getEntryId() {
+ return entryId;
+ }
+
+ public void setEntryId(String entryId) {
+ this.entryId = entryId;
+ }
+
+ public String getFeed() {
+ return feed;
+ }
+
+ public void setFeed(String feed) {
+ this.feed = feed;
+ }
+
+ public String getEntryBody() {
+ return entryBody;
+ }
+
+ public void setEntryBody(String entryBody) {
+ this.entryBody = entryBody;
+ }
+
+ public String getCreationDate() {
+ return creationDate;
+ }
+
+ public void setCreationDate(String creationDate) {
+ this.creationDate = creationDate;
+ }
+
+ public String getDateLastUpdated() {
+ return dateLastUpdated;
+ }
+
+ public void setDateLastUpdated(String dateLastUpdated) {
+ this.dateLastUpdated = dateLastUpdated;
+ }
+
+ public List getCategories() {
+ return categories;
+ }
+
+ public void setCategories(List categories) {
+ this.categories = categories;
+ }
+
+
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/CategoryStringGenerator.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/CategoryStringGenerator.java
new file mode 100644
index 000000000..65ed56997
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/CategoryStringGenerator.java
@@ -0,0 +1,144 @@
+package org.atomhopper.dynamodb.query;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public final class CategoryStringGenerator {
+
+ private static final char INCLUSIVE_OPERATOR = '+', ESCAPE_OPERATOR = '\\';
+ private static final char[] OPERATORS = {INCLUSIVE_OPERATOR, ESCAPE_OPERATOR};
+
+ private CategoryStringGenerator() {
+ throw new AssertionError();
+ }
+
+ /**
+ * Takes a search string, prefix map & prefix character & returns an array of strings which are arguments to the
+ * corresponding generated SQL statement.
+ *
+ * See SearchToSqlConverter for more details.
+ *
+ * @param searchString
+ * @param mapPrefix
+ * @param prefixSplit
+ *
+ * @return
+ */
+ public static List getPostgresCategoryString(String searchString, Map mapPrefix, String prefixSplit ) {
+
+ List finalList = new ArrayList();
+ if (searchString == null || !(searchString.trim().length() > 0)) {
+ finalList.add( "{}" );
+ return finalList;
+ }
+
+ List categories = parse(searchString.trim().toLowerCase());
+ List catHolder = new ArrayList();
+
+ // find if any categories are prefixed, if so, split them out.
+ for( String cat : categories ) {
+ if (cat.matches( SQLToNoSqlConverter.BAD_SEARCH_REGEX ) ) {
+ throw new IllegalArgumentException( SQLToNoSqlConverter.BAD_CHAR_MSG );
+ }
+
+ if (prefixSplit != null ) {
+ int index = cat.indexOf( prefixSplit );
+ if ( index != -1 ) {
+ String prefix = cat.substring( 0, index );
+
+ if ( mapPrefix.containsKey( prefix ) ) {
+ addToFinalList( finalList, catHolder );
+ finalList.add( cat );
+ }
+ else {
+ catHolder.add( cat );
+ }
+ }
+ else {
+ catHolder.add( cat );
+ }
+ }
+ else {
+ catHolder.add( cat );
+ }
+ }
+
+ addToFinalList( finalList, catHolder );
+ return finalList;
+ }
+
+ private static void addToFinalList( List finalList, List catHolder ) {
+
+ if ( !catHolder.isEmpty() ) {
+
+ String psArray = DynamoDBTextArray.stringArrayToPostgreSQLTextArray( catHolder.toArray( new String[ catHolder
+ .size() ] ) );
+ finalList.add( psArray );
+
+ catHolder.clear();
+ }
+ }
+
+ private static List parse(String searchString) {
+ List categories = new ArrayList();
+
+ for (int charIndex = 0; charIndex < searchString.length(); charIndex++) {
+ final char nextOperator = searchString.charAt(charIndex);
+ final StringBuilder searchTermBuilder = new StringBuilder();
+
+ charIndex = readTerm(searchString, searchTermBuilder, charIndex + 1);
+
+ switch (nextOperator) {
+ case INCLUSIVE_OPERATOR:
+ if( searchTermBuilder.toString().isEmpty() ) {
+
+ throw new IllegalArgumentException( "Invalid Search Parameter: Parameter cannot be empty string." );
+ }
+
+ categories.add(searchTermBuilder.toString());
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ return categories;
+ }
+
+ private static int readTerm(String searchString, StringBuilder builder, int currentCharIndex) {
+ int charIndex = currentCharIndex;
+ boolean isEscaped = false;
+
+
+ while( charIndex < searchString.length() ) {
+ final char nextChar = searchString.charAt(charIndex);
+
+ if (isEscaped || !isOperator(nextChar)) {
+ builder.append( nextChar );
+ isEscaped = false;
+ } else {
+ if (nextChar == ESCAPE_OPERATOR) {
+ isEscaped = true;
+ } else {
+ return charIndex - 1;
+ }
+ }
+
+ charIndex++;
+ }
+
+ return charIndex;
+ }
+
+ private static boolean isOperator(char character) {
+ for (char operator : OPERATORS) {
+ if (operator == character) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/DynamoDBQueryBuilder.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/DynamoDBQueryBuilder.java
new file mode 100644
index 000000000..f7c901f7d
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/DynamoDBQueryBuilder.java
@@ -0,0 +1,79 @@
+package org.atomhopper.dynamodb.query;
+
+import org.joda.time.DateTime;
+
+import java.util.Map;
+
+public class DynamoDBQueryBuilder {
+ private String searchString;
+ private SearchType type;
+ private int feedHeadDelayInSeconds = -1;
+ private DateTime startingTimestamp;
+
+ private static final String EQUALS = "=";
+ private static final String LESS_THAN = "<";
+ private static final String GREATER_THAN = ">";
+
+ private static final String QUESTION_MARK = "?";
+
+ private static final String OPEN_PARENS = "(";
+ private static final String CLOSE_PARENS = ")";
+
+ private static final String SELECT = "SELECT * FROM entries WHERE feed = ?";
+ private static final String AND = "AND";
+ private static final String SPACE = " ";
+ private static final String DATELASTUPDATED = "datelastupdated %s ?";
+ private static final String ID = "id %s ?";
+
+ private static final String UNION_ALL = "UNION ALL";
+
+ private static final String ORDER_BY_ASC = "ORDER BY datelastupdated ASC, id ASC LIMIT ?";
+ private static final String ORDER_BY_ASC_LIMIT = "ORDER BY datelastupdated ASC, id ASC LIMIT %s";
+ private static final String ORDER_BY_DATE_ASC_ID_DESC_LIMIT = "ORDER BY datelastupdated ASC, id DESC LIMIT %s";
+ private static final String ORDER_BY_DESC_LIMIT = "ORDER BY datelastupdated DESC, id DESC LIMIT %s";
+ private static final String ORDER_BY_DATE_DESC_ID_ASC_LIMIT = "ORDER BY datelastupdated DESC, id ASC LIMIT %s";
+ private static final String DB_TIMESTAMP_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS z";
+
+ private SQLToNoSqlConverter SQLToNoSqlConverter;
+
+ public DynamoDBQueryBuilder(SQLToNoSqlConverter converter ) {
+
+ SQLToNoSqlConverter = converter;
+ }
+
+ public DynamoDBQueryBuilder searchString(String searchString) {
+ this.searchString = searchString;
+ return this;
+ }
+
+ public DynamoDBQueryBuilder searchType(SearchType type) {
+ this.type = type;
+ return this;
+ }
+
+ public DynamoDBQueryBuilder feedHeadDelayInSeconds(int delay) {
+ this.feedHeadDelayInSeconds = delay;
+ return this;
+ }
+
+ public DynamoDBQueryBuilder startingTimestamp(DateTime timestamp) {
+ this.startingTimestamp = timestamp;
+ return this;
+ }
+
+ public String getFilters(Map map) {
+
+ String searchSql = SQLToNoSqlConverter.getSqlFromSearchString(searchString,map);
+
+ switch (type) {
+
+ case FEED_BACKWARD:
+ return searchSql;
+
+ case FEED_FORWARD:
+ return searchSql;
+ }
+ return null;
+ }
+
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/DynamoDBTextArray.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/DynamoDBTextArray.java
new file mode 100644
index 000000000..4d1de4356
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/DynamoDBTextArray.java
@@ -0,0 +1,198 @@
+package org.atomhopper.dynamodb.query;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * This is class provides {@link java.sql.Array} interface for PostgreSQL
+ *
text array.
+ *
+ *
+ */
+public class DynamoDBTextArray implements java.sql.Array {
+
+ private final String[] stringArray;
+ private final String stringValue;
+
+ /**
+ * Initializing constructor
+ *
+ * @param stringArray
+ */
+ public DynamoDBTextArray(String[] stringArray) {
+ if (stringArray == null) {
+ this.stringArray = null;
+ } else {
+ this.stringArray = Arrays.copyOf(stringArray, stringArray.length);
+ }
+ this.stringValue = stringArrayToPostgreSQLTextArray(this.stringArray);
+ }
+
+ @Override
+ public String toString() {
+ return stringValue;
+ }
+ private static final String NULL = "NULL";
+
+ /**
+ * This static method can be used to convert an string array to string
+ * representation of PostgreSQL text array.
+ *
+ * @param stringArray source String array
+ * @return string representation of a given text array
+ */
+ public static String stringArrayToPostgreSQLTextArray(String[] stringArray) {
+ final int arrayLength;
+ final int bufferAddition = 4;
+ if (stringArray == null) {
+ return NULL;
+ }
+
+ arrayLength = stringArray.length;
+
+ if (arrayLength == 0) {
+ return "{}";
+ }
+
+ // count the string length and if need to quote
+ int neededBufferLength = 2;
+ // count the beginning '{' and the ending '}' brackets
+ boolean[] shouldQuoteArray = new boolean[stringArray.length];
+ for (int si = 0; si < arrayLength; si++) {
+ // count the comma after the first element
+ if (si > 0) {
+ neededBufferLength++;
+ }
+
+ boolean shouldQuote;
+ final String s = stringArray[si];
+ if (s == null) {
+ neededBufferLength += bufferAddition;
+ shouldQuote = false;
+ } else {
+ final int l = s.length();
+ neededBufferLength += l;
+ if (l == 0 || s.equalsIgnoreCase(NULL)) {
+ shouldQuote = true;
+ } else {
+ shouldQuote = false;
+ // scan for commas and quotes
+ for (int i = 0; i < l; i++) {
+ final char ch = s.charAt(i);
+ switch (ch) {
+ case '"':
+ case '\\':
+ shouldQuote = true;
+ // we will escape these characters
+ neededBufferLength++;
+ break;
+ case ',':
+ case '\'':
+ case '{':
+ case '}':
+ shouldQuote = true;
+ break;
+ default:
+ if (Character.isWhitespace(ch)) {
+ shouldQuote = true;
+ }
+ break;
+ }
+ }
+ }
+ // count the quotes
+ if (shouldQuote) {
+ neededBufferLength += 2;
+ }
+ }
+ shouldQuoteArray[si] = shouldQuote;
+ }
+
+ // construct the String
+ final StringBuilder sb = new StringBuilder(neededBufferLength);
+ sb.append('{');
+ for (int si = 0; si < arrayLength; si++) {
+ final String s = stringArray[si];
+ if (si > 0) {
+ sb.append(',');
+ }
+ if (s == null) {
+ sb.append(NULL);
+ } else {
+ final boolean shouldQuote = shouldQuoteArray[si];
+ if (shouldQuote) {
+ sb.append('"');
+ }
+ for (int i = 0, l = s.length(); i < l; i++) {
+ final char ch = s.charAt(i);
+ if (ch == '"' || ch == '\\') {
+ sb.append('\\');
+ }
+ sb.append(ch);
+ }
+ if (shouldQuote) {
+ sb.append('"');
+ }
+ }
+ }
+ sb.append('}');
+ assert sb.length() == neededBufferLength;
+ return sb.toString();
+ }
+
+ @Override
+ public Object getArray() throws SQLException {
+ return stringArray == null ? null : Arrays.copyOf(stringArray, stringArray.length);
+ }
+
+ @Override
+ public Object getArray(Map> map) throws SQLException {
+ return getArray();
+ }
+
+ @Override
+ public Object getArray(long index, int count) throws SQLException {
+ return stringArray == null ? null : Arrays.copyOfRange(stringArray, (int) index, (int) index + count);
+ }
+
+ @Override
+ public Object getArray(long index, int count, Map> map) throws SQLException {
+ return getArray(index, count);
+ }
+
+ @Override
+ public int getBaseType() throws SQLException {
+ return java.sql.Types.VARCHAR;
+ }
+
+ @Override
+ public String getBaseTypeName() throws SQLException {
+ return "text";
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getResultSet(Map> map) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getResultSet(long index, int count) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getResultSet(long index, int count, Map> map) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void free() throws SQLException {
+ }
+}
\ No newline at end of file
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/JsonUtil.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/JsonUtil.java
new file mode 100644
index 000000000..802343350
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/JsonUtil.java
@@ -0,0 +1,61 @@
+package org.atomhopper.dynamodb.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.atomhopper.dynamodb.model.PersistedEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * Utility class for the methods declared in the dynamoDB feed Source.
+ */
+public class JsonUtil {
+ static Logger LOG = LoggerFactory.getLogger(JsonUtil.class);
+
+ /**
+ * This method is used to return the date from date interval of 2 seconds
+ * like now() - interval '2 seconds
+ *
+ * @param seconds: 2 seconds
+ * @return date format
+ */
+ public static String getCurrentDateWithMinusSecond(int seconds) {
+ LOG.info("getCurrentDateWithMinusSecond");
+ Format formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(new Date());
+ calendar.set(Calendar.SECOND, (calendar.get(Calendar.SECOND) - seconds));
+ LOG.info("RETURN RESULT VALUE:" + formatter.format(calendar.getTime()));
+ return formatter.format(calendar.getTime());
+ }
+
+ /**
+ * This method is used to parse list of string into PersistentEntry using object mapper
+ *
+ * @param feedPage: List of json object
+ * @return List of PersistentEntry
+ */
+
+ public static List getPersistenceEntity(List feedPage) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ List jsonToPersistentList = new ArrayList<>();
+ //2. Convert JSON to List of Person objects
+ //2. Convert JSON to List of PersistedEntry objectschanges
+ //Define Custom Type reference for List type
+ TypeReference mapType = new TypeReference() {
+ };
+ for (String s : feedPage) {
+ try {
+ jsonToPersistentList.add(objectMapper.readValue(s, mapType));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ return jsonToPersistentList;
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/SQLToNoSqlConverter.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/SQLToNoSqlConverter.java
new file mode 100644
index 000000000..90b2153d3
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/SQLToNoSqlConverter.java
@@ -0,0 +1,302 @@
+package org.atomhopper.dynamodb.query;
+
+import com.unboundid.ldap.sdk.Filter;
+import com.unboundid.ldap.sdk.LDAPException;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.*;
+
+/**
+ * This class creates the search SQL and corresponding SQL parameters for a given set of query parameters.
+ *
+ * By default, this class assumes all categories are treated the same and stored in a single variable-length array.
+ *
+ * This class can also be customized to map specific categories to specific columns in the DB, allowing for higher
+ * performance when searching on these categories.
+ *
+ * To configure this functionality pass in the following:
+ *
+ *
+ * - Map[String, String] - a map of pairs where the key is the prefix found in the atom category and the
+ * value is the name of the SQL text column.
+ * - String - the String which separates the prefix from the value within the atom category. E.g.,
+ * for the category value of "tid:1234", if ':' is the mark, then "tid" is the prefix.
+ *
+ *
+ * An example configuration might be:
+ *
+ * A map of { "tid" => "tenantid", "type" => "eventtype" } with a mark of ":".
+ *
+ * This maps the following atom categories:
+ *
+ *
+ * - "tid:1234" => enter "1234" into the "tenantid" column
+ * - "type:lbaas.usage" => enter "lbaas.usage" into the "eventtype" column
+ *
+ */
+public class SQLToNoSqlConverter {
+
+ public static final String BAD_SEARCH_REGEX = ".*(\"|,).*";
+ public static final String BAD_CHAR_MSG = "Invalid Search Parameter: '\"' ',' not allowed.";
+
+ private static final String OPEN_PARENS = "(";
+ private static final String CLOSED_PARENS = ")";
+ private static final String OPEN_CURLY_BRACKET = "{";
+ private static final String CLOSED_CURLY_BRACKET = "}";
+ private static final String PLUS_SIGN = "+";
+ private static final String AND = " AND ";
+ private static final String OR = " OR ";
+ private static final String NOT = " not ";
+
+ private static final String CATEGORY = "cat";
+ private static final String CATEGORY_STRING = " (contains(categories, :categories)) ";
+
+ public static final String OLD_CATEGORY_STRING = " categories && ?::varchar[] ";
+
+ private static final String COLUMN_STRING= " = ? ";
+
+ private String prefixSplit = null;
+
+ private Map mapPrefix = new HashMap();
+
+ public SQLToNoSqlConverter() { }
+
+ public SQLToNoSqlConverter(Map mapper, String split ) {
+
+ prefixSplit = split;
+ mapPrefix = new HashMap( mapper );
+ }
+
+ public String getSqlFromSearchString(String searchString,Map map) {
+
+ if (StringUtils.isBlank(searchString)) {
+ return null;
+ }
+
+ if (searchString.startsWith(PLUS_SIGN)) {
+ //return getSqlForClassicSearchFormat(searchString);
+ return null;
+ } else if (searchString.startsWith(OPEN_PARENS)) {
+ searchString = textToLDapSearch(searchString);
+ Filter filter;
+ try {
+ filter = Filter.create(searchString);
+ } catch (LDAPException ex) {
+ throw new IllegalArgumentException("Invalid LDAP Search Parameter");
+ }
+ return getSqlFromLdapFilter(filter,map);
+ } else {
+ throw new IllegalArgumentException("Invalid Search Parameter: Search must begin with a '+' or a '(' character");
+ }
+ }
+
+ public List getParamsFromSearchString(String searchString) {
+
+ if (StringUtils.isBlank(searchString)) {
+ return new ArrayList();
+ }
+
+ if (searchString.startsWith(PLUS_SIGN)) {
+ return getParametersForClassicSearchFormat((searchString));
+ } else if (searchString.startsWith(OPEN_PARENS)) {
+ searchString = textToLDapSearch(searchString);
+ Filter filter;
+ try {
+ filter = Filter.create(searchString);
+ } catch (LDAPException ex) {
+ throw new IllegalArgumentException("Invalid LDAP Search Parameter");
+ }
+ return getParametersFromLdapFilter(filter);
+ } else {
+ throw new IllegalArgumentException("Invalid Search Parameter: Search must begin with a '+' or a '(' character");
+ }
+ }
+
+ private String textToLDapSearch(String searchString) {
+ searchString = searchString.replace("(AND", "(&");
+ searchString = searchString.replace("(OR", "(|");
+ searchString = searchString.replace("(NOT", "(!");
+ return searchString;
+ }
+
+ private String getSqlForClassicSearchFormat(String searchString) {
+
+ String[] params = searchString.split( "\\+" );
+
+ List sqlList = new ArrayList();
+ String last = "";
+
+ // first item is an empty string, so we skip
+ for( int i = 1; i < params.length; i++ ) {
+
+ String state = createSql( params[ i ], OLD_CATEGORY_STRING );
+
+ // if we have several generic categories, we only need 1 sql statement to handle them
+ if( !(state.equals( OLD_CATEGORY_STRING ) && last.equals( OLD_CATEGORY_STRING ) ) ) {
+
+ sqlList.add( state );
+ }
+
+ last = state;
+ }
+
+ StringBuilder sql = new StringBuilder();
+
+ sql.append(OPEN_PARENS);
+
+ for( int i = 0; i < sqlList.size(); i++ ) {
+
+ if ( i > 0 )
+ sql.append( OR );
+
+ sql.append( sqlList.get( i ) );
+ }
+
+ sql.append(CLOSED_PARENS);
+
+ return sql.toString();
+ }
+
+ private List getParametersForClassicSearchFormat(String searchString) {
+ List params = new ArrayList();
+ params.addAll(CategoryStringGenerator.getPostgresCategoryString(searchString, mapPrefix, prefixSplit ) );
+ return params;
+ }
+
+ private String getSqlFromLdapFilter(Filter filter,Map a) {
+
+ StringBuilder sql = new StringBuilder();
+
+ Filter[] filters = filter.getComponents();
+ Filter notFilter = filter.getNOTComponent();
+
+ switch (filter.getFilterType()) {
+
+ case Filter.FILTER_TYPE_AND:
+ for (int x=0 ; x < filters.length; x++) {
+ if (x == 0) {
+ sql.append(OPEN_PARENS);
+ }
+ if (x > 0) {
+ sql.append(AND);
+ }
+ sql.append(getSqlFromLdapFilter(filters[x],a));
+ if (x == filters.length - 1) {
+ sql.append(CLOSED_PARENS);
+ }
+ }
+ break;
+
+ case Filter.FILTER_TYPE_OR:
+ for (int x=0 ; x < filters.length; x++) {
+ if (x == 0) {
+ sql.append(OPEN_PARENS);
+ }
+ if (x > 0) {
+ sql.append(OR);
+ }
+ sql.append(getSqlFromLdapFilter(filters[x],a));
+ if (x == filters.length - 1) {
+ sql.append(CLOSED_PARENS);
+ }
+ }
+ break;
+
+ case Filter.FILTER_TYPE_NOT:
+ sql.append(OPEN_PARENS);
+ sql.append(NOT);
+ sql.append(getSqlFromLdapFilter(notFilter,a));
+ sql.append(CLOSED_PARENS);
+ break;
+
+ case Filter.FILTER_TYPE_EQUALITY:
+ if (!filter.getAttributeName().equals(CATEGORY)) {
+ throw new IllegalArgumentException("Invalid Search Parameter: LDAP attribute name must be 'cat'");
+ }
+ //String key = UUID.randomUUID().toString();
+ a.put(":categories",filter.getAssertionValue());
+ sql.append(CATEGORY_STRING);
+ break;
+ }
+
+ return sql.toString();
+ }
+
+ private String createSql( String param, String defaultSql ) {
+
+ if( prefixSplit != null ) {
+
+ int index = param.indexOf( prefixSplit );
+
+ if ( index != -1 ) {
+
+ String prefix = param.substring( 0, index );
+
+ // detect prefix in map
+ if ( mapPrefix.containsKey( prefix ) ) {
+
+ String column = mapPrefix.get( prefix );
+
+ return " " + column + COLUMN_STRING;
+ }
+ }
+ }
+
+ return defaultSql;
+ }
+
+ private List getParametersFromLdapFilter(Filter filter) {
+
+ List params = new ArrayList();
+
+ Filter[] filters = filter.getComponents();
+ Filter notFilter = filter.getNOTComponent();
+
+ switch (filter.getFilterType()) {
+
+ case Filter.FILTER_TYPE_AND:
+ case Filter.FILTER_TYPE_OR:
+ for (int x=0 ; x < filters.length; x++) {
+ params.addAll(getParametersFromLdapFilter(filters[x]));
+ }
+ break;
+
+ case Filter.FILTER_TYPE_NOT:
+ params.addAll(getParametersFromLdapFilter(notFilter));
+ break;
+
+ case Filter.FILTER_TYPE_EQUALITY:
+ if (!filter.getAttributeName().equals(CATEGORY)) {
+ throw new IllegalArgumentException("Invalid Search Parameter: LDAP attribute name must be 'cat'");
+ }
+
+ // default
+ String param = OPEN_CURLY_BRACKET + filter.getAssertionValue().toLowerCase() + CLOSED_CURLY_BRACKET;
+
+ if( prefixSplit != null ) {
+
+ int index = filter.getAssertionValue().indexOf( prefixSplit );
+ if ( index != -1 ) {
+
+ String prefix = filter.getAssertionValue().substring( 0, index );
+ if ( mapPrefix.containsKey( prefix ) ) {
+ param = filter.getAssertionValue();
+ }
+ }
+ }
+
+ if (param.matches( BAD_SEARCH_REGEX ) ) {
+
+ throw new IllegalArgumentException( BAD_CHAR_MSG );
+ }
+
+ params.add( param );
+ break;
+ default:
+
+ throw new IllegalArgumentException( "Invalid Search Parameter" );
+ }
+
+ return params;
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/SearchType.java b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/SearchType.java
new file mode 100644
index 000000000..c1d76f6c6
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/main/java/org/atomhopper/dynamodb/query/SearchType.java
@@ -0,0 +1,11 @@
+package org.atomhopper.dynamodb.query;
+
+public enum SearchType {
+ FEED_FORWARD,
+ FEED_BACKWARD,
+ FEED_HEAD,
+ LAST_PAGE,
+ NEXT_LINK,
+ BY_TIMESTAMP_FORWARD,
+ BY_TIMESTAMP_BACKWARD
+}
diff --git a/adapters/dynamoDB_adapters/src/test/java/DynamoDBFeedPublisherTest.java b/adapters/dynamoDB_adapters/src/test/java/DynamoDBFeedPublisherTest.java
new file mode 100644
index 000000000..cfb0da7a9
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/test/java/DynamoDBFeedPublisherTest.java
@@ -0,0 +1,124 @@
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression;
+import com.amazonaws.services.dynamodbv2.document.*;
+import com.amazonaws.services.dynamodbv2.document.internal.IteratorSupport;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import org.apache.abdera.model.Entry;
+import org.apache.abdera.parser.stax.FOMEntry;
+import org.atomhopper.adapter.request.adapter.DeleteEntryRequest;
+import org.atomhopper.adapter.request.adapter.PostEntryRequest;
+import org.atomhopper.adapter.request.adapter.PutEntryRequest;
+import org.atomhopper.dynamodb.adapter.DynamoDBFeedPublisher;
+import org.atomhopper.dynamodb.model.PersistedEntry;
+import org.atomhopper.response.AdapterResponse;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.springframework.http.HttpStatus;
+
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import static junit.framework.Assert.*;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DynamoDBFeedPublisherTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Mock
+ private DynamoDB dynamoDB;
+ @Mock
+ DynamoDBQueryExpression querySpec;
+ @Mock
+ private DynamoDBMapper dynamoDBMapper;
+ private DynamoDBFeedPublisher dynamoDBFeedPublisher = new DynamoDBFeedPublisher();
+ @Mock
+ private AmazonDynamoDBClient amazonDynamoDBClient;
+
+
+ private final String MARKER_ID = UUID.randomUUID().toString();
+ private final String ENTRY_BODY = "";
+ private final String FEED_NAME = "namespace/feed";
+ private PersistedEntry persistedEntry;
+ private List entryList;
+ private PostEntryRequest postEntryRequest;
+ private PutEntryRequest putEntryRequest;
+ private DeleteEntryRequest deleteEntryRequest;
+
+ @Before
+ public void setUp() throws Exception {
+ dynamoDBFeedPublisher.setDynamoDBClient(amazonDynamoDBClient);
+ dynamoDBFeedPublisher.setDynamoMapper(dynamoDBMapper);
+ dynamoDBFeedPublisher.setDynamoDB(dynamoDB);
+ persistedEntry = new PersistedEntry();
+ persistedEntry.setFeed(FEED_NAME);
+ persistedEntry.setEntryId(MARKER_ID);
+ persistedEntry.setEntryBody(ENTRY_BODY);
+ persistedEntry.setDateLastUpdated(getDateFormatInString(new Date()));
+ persistedEntry.setCreationDate(getDateFormatInString(new Date()));
+ entryList = new ArrayList();
+ entryList.add(persistedEntry);
+ putEntryRequest = mock(PutEntryRequest.class);
+ deleteEntryRequest = mock(DeleteEntryRequest.class);
+ postEntryRequest = mock(PostEntryRequest.class);
+ when(postEntryRequest.getEntry()).thenReturn(entry());
+ when(postEntryRequest.getFeedName()).thenReturn("namespace/feed");
+ }
+
+ @Test
+ public void showSaveTheObjectInDynamoDb() throws Exception {
+ dynamoDBFeedPublisher.setAllowOverrideId(false);
+ doNothing().when(dynamoDBMapper).save(persistedEntry);
+ AdapterResponse adapterResponse = dynamoDBFeedPublisher.postEntry(postEntryRequest);
+ assertEquals("Should return HTTP 201 (Created)", HttpStatus.CREATED, adapterResponse.getResponseStatus());
+ }
+
+ @Test
+ public void showErrorIfAlreadyExistsEntryIDInDynamoDb() throws Exception {
+ final Table mockTable = mock(Table.class);
+ when(dynamoDB.getTable(any(String.class))).thenReturn(mockTable);
+ final Index mockIndex = mock(Index.class);
+ when(mockTable.getIndex(anyString())).thenReturn(mockIndex);
+ final ItemCollection outcome = mock(ItemCollection.class);
+ when(mockIndex.query(any(QuerySpec.class))).thenReturn(outcome);
+ final IteratorSupport- mockIterator = mock(IteratorSupport.class);
+ final Item mockItem = new Item();
+ when(outcome.iterator()).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next()).thenReturn(mockItem);
+ AdapterResponse adapterResponse = dynamoDBFeedPublisher.postEntry(postEntryRequest);
+ assertEquals("Should return HTTP 409 (Conflict)", HttpStatus.CONFLICT, adapterResponse.getResponseStatus());
+
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldPutEntry() throws Exception {
+ dynamoDBFeedPublisher.putEntry(putEntryRequest);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldDeleteEntry() throws Exception {
+ dynamoDBFeedPublisher.deleteEntry(deleteEntryRequest);
+ }
+
+ public Entry entry() {
+ final FOMEntry entry = new FOMEntry();
+ entry.setId(UUID.randomUUID().toString());
+ entry.setContent("testing");
+ entry.addCategory("category");
+ return entry;
+ }
+
+ public String getDateFormatInString(Date date) {
+ Format formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ return formatter.format(date);
+ }
+}
diff --git a/adapters/dynamoDB_adapters/src/test/java/DynamoDBFeedSourceTest.java b/adapters/dynamoDB_adapters/src/test/java/DynamoDBFeedSourceTest.java
new file mode 100644
index 000000000..7a37db9c5
--- /dev/null
+++ b/adapters/dynamoDB_adapters/src/test/java/DynamoDBFeedSourceTest.java
@@ -0,0 +1,222 @@
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.datamodeling.*;
+import com.amazonaws.services.dynamodbv2.document.*;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.internal.IteratorSupport;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
+import org.apache.abdera.Abdera;
+import org.apache.abdera.model.Feed;
+import org.atomhopper.adapter.request.adapter.GetEntryRequest;
+import org.atomhopper.adapter.request.adapter.GetFeedRequest;
+import org.atomhopper.dynamodb.adapter.DynamoDBFeedSource;
+import org.atomhopper.dynamodb.adapter.DynamoDBFeedPublisher;
+import org.atomhopper.dynamodb.model.PersistedEntry;
+import org.atomhopper.dynamodb.query.JsonUtil;
+import org.atomhopper.response.AdapterResponse;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.springframework.http.HttpStatus;
+
+import java.net.URL;
+import java.util.*;
+
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DynamoDBFeedSourceTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+ @Mock
+ private DynamoDBMapper dynamoDBMapper;
+ @Mock
+ private DynamoDB dynamoDB;
+ @Mock
+ private AmazonDynamoDBClient amazonDynamoDBClient;
+ @Mock
+ private PaginatedQueryList paginatedQueryList;
+ private DynamoDBFeedPublisher dynamoFeedDBPublisher = new DynamoDBFeedPublisher();
+ private GetFeedRequest getFeedRequest;
+ private DynamoDBFeedSource dynamoDBFeedSource;
+ private GetEntryRequest getEntryRequest;
+ private PersistedEntry persistedEntry;
+ private List entryList;
+ private List emptyList;
+ private Abdera abdera;
+ private final String MARKER_ID = UUID.randomUUID().toString();
+ private final String ENTRY_BODY = "";
+ private final String FEED_NAME = "namespace/feed";
+ private final String FORWARD = "forward";
+ private final String BACKWARD = "backward";
+ private final String SINGLE_CAT = "+Cat1";
+ private final String MULTI_CAT = "+Cat1+Cat2";
+ private final String AND_CAT = "(AND(cat=cat1)(cat=cat2))";
+ private final String OR_CAT = "(OR(cat=cat1)(cat=cat2))";
+ private final String NOT_CAT = "(NOT(cat=CAT1))";
+ private final String MOCK_LAST_MARKER = "last";
+ private final String NEXT_ARCHIVE = "next-archive";
+ private final String ARCHIVE_LINK = "http://archive.com/namespace/feed/archive";
+ private final String CURRENT = "current";
+
+
+ @Before
+ public void setUp() throws Exception {
+ dynamoDBFeedSource = new DynamoDBFeedSource(amazonDynamoDBClient);
+ dynamoFeedDBPublisher.setDynamoDBClient(amazonDynamoDBClient);
+ dynamoFeedDBPublisher.setDynamoMapper(dynamoDBMapper);
+ dynamoDBFeedSource.setDynamoDB(dynamoDB);
+ persistedEntry = new PersistedEntry();
+ persistedEntry.setFeed(FEED_NAME);
+ persistedEntry.setEntryId(MARKER_ID);
+ persistedEntry.setEntryBody(ENTRY_BODY);
+
+ emptyList = new ArrayList();
+
+ entryList = new ArrayList();
+ entryList.add(persistedEntry);
+
+ // Mocks
+ abdera = mock(Abdera.class);
+ getFeedRequest = mock(GetFeedRequest.class);
+ getEntryRequest = mock(GetEntryRequest.class);
+
+ dynamoDBFeedSource.setArchiveUrl(new URL(ARCHIVE_LINK));
+
+ // Mock GetEntryRequest
+ when(getEntryRequest.getFeedName()).thenReturn(FEED_NAME);
+ when(getEntryRequest.getEntryId()).thenReturn(MARKER_ID);
+
+ //Mock GetFeedRequest
+ when(getFeedRequest.getFeedName()).thenReturn(FEED_NAME);
+ when(getFeedRequest.getPageSize()).thenReturn("25");
+ when(getFeedRequest.getAbdera()).thenReturn(abdera);
+
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowExceptionForPrefixColumnMap() throws Exception {
+
+ dynamoDBFeedSource.setDelimiter(":");
+ dynamoDBFeedSource.afterPropertiesSet();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowExceptionForDelimiter() throws Exception {
+
+ Map map = new HashMap();
+ map.put("test1", "testA");
+
+ dynamoDBFeedSource.setPrefixColumnMap(map);
+ dynamoDBFeedSource.afterPropertiesSet();
+ }
+
+ @Test
+ public void shouldNotGetFeedWithMarkerDirectionForward() throws Exception {
+ Abdera localAbdera = new Abdera();
+ when(getFeedRequest.getAbdera()).thenReturn(localAbdera);
+ when(getFeedRequest.getPageMarker()).thenReturn(MARKER_ID);
+ when(getFeedRequest.getDirection()).thenReturn("FORWARD");
+ final Table mockTable = mock(Table.class);
+ when(dynamoDB.getTable(any(String.class))).thenReturn(mockTable);
+ final Index mockIndex = mock(Index.class);
+ when(mockTable.getIndex(anyString())).thenReturn(mockIndex);
+ final ItemCollection outcome = mock(ItemCollection.class);
+ when(mockIndex.query(any(QuerySpec.class))).thenReturn(outcome);
+ final IteratorSupport
- mockIterator = mock(IteratorSupport.class);
+ final Item mockItem = new Item();
+ when(outcome.iterator()).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(false);
+ when(mockIterator.next()).thenReturn(mockItem);
+ assertEquals("Should get a 404 response", HttpStatus.NOT_FOUND,
+ dynamoDBFeedSource.getFeed(getFeedRequest).getResponseStatus());
+ }
+
+ @Test
+ public void shouldNotGetFeedWithMarkerDirectionBackward() throws Exception {
+ Abdera localAbdera = new Abdera();
+ when(getFeedRequest.getAbdera()).thenReturn(localAbdera);
+ when(getFeedRequest.getPageMarker()).thenReturn(MARKER_ID);
+ when(getFeedRequest.getDirection()).thenReturn("BACKWARD");
+ final Table mockTable = mock(Table.class);
+ when(dynamoDB.getTable(any(String.class))).thenReturn(mockTable);
+ final Index mockIndex = mock(Index.class);
+ when(mockTable.getIndex(anyString())).thenReturn(mockIndex);
+ final ItemCollection outcome = mock(ItemCollection.class);
+ when(mockIndex.query(any(QuerySpec.class))).thenReturn(outcome);
+ final IteratorSupport
- mockIterator = mock(IteratorSupport.class);
+ final Item mockItem = new Item();
+ when(outcome.iterator()).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(false);
+ when(getFeedRequest.getAbdera()).thenReturn(localAbdera);
+ when(getFeedRequest.getPageMarker()).thenReturn(MARKER_ID);
+ when(getFeedRequest.getDirection()).thenReturn("BACKWARD");
+ assertEquals("Should get a 404 response", HttpStatus.NOT_FOUND,
+ dynamoDBFeedSource.getFeed(getFeedRequest).getResponseStatus());
+ }
+
+
+ @Test
+ public void shouldGetFeedHead() throws Exception {
+ Abdera localAbdera = new Abdera();
+ List newList = new ArrayList<>();
+ when(getFeedRequest.getDirection()).thenReturn("forward");
+ when(getFeedRequest.getAbdera()).thenReturn(localAbdera);
+ when(getEntryRequest.getAbdera()).thenReturn(localAbdera);
+ final Table mockTable = mock(Table.class);
+ when(dynamoDB.getTable(any(String.class))).thenReturn(mockTable);
+ final Index mockIndex = mock(Index.class);
+ when(mockTable.getIndex(anyString())).thenReturn(mockIndex);
+ final ItemCollection outcome = mock(ItemCollection.class);
+ when(mockIndex.query(any(QuerySpec.class))).thenReturn(outcome);
+ final IteratorSupport
- mockIterator = mock(IteratorSupport.class);
+ final Item mockItem = new Item();
+ when(outcome.iterator()).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(false);
+ when(mockIterator.next()).thenReturn(mockItem);
+ newList.add("fe9eedc8-d10c-47a6-9209-f755ea8c35c3");
+ newList.add("");
+ DynamoDBFeedSource mainModel = Mockito.mock(DynamoDBFeedSource.class);
+ Mockito.when(mainModel.getQueryBuilderMethod(dynamoDB, "feed = :feed", "dateLastUpdated < :dateLastUpdated", 25, new ValueMap(), true)).thenReturn(newList);
+ assertEquals("Should get a 200 response", HttpStatus.OK,
+ dynamoDBFeedSource.getFeed(getFeedRequest).getResponseStatus());
+ }
+
+
+ @Test(expected = RuntimeException.class)
+ public void shouldReturnFeedWithCorrectTimeStampForForwardDirection() throws Exception {
+ final Table mockTable = mock(Table.class);
+ when(dynamoDB.getTable(any(String.class))).thenReturn(mockTable);
+ final Index mockIndex = mock(Index.class);
+ when(mockTable.getIndex(anyString())).thenReturn(mockIndex);
+ final ItemCollection outcome = mock(ItemCollection.class);
+ when(mockIndex.query(any(QuerySpec.class))).thenReturn(outcome);
+ final IteratorSupport
- mockIterator = mock(IteratorSupport.class);
+ final Item mockItem = new Item();
+ when(outcome.iterator()).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(false);
+ when(mockIterator.next()).thenReturn(mockItem);
+ when(getFeedRequest.getDirection()).thenReturn(BACKWARD);
+ dynamoDBFeedSource.getFeedPageByTimestamp(getFeedRequest, "2014-03-10T00:00:00.000Z", 25);
+ }
+
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void shouldSetParameters() throws Exception {
+ Map map = new HashMap();
+ map.put("test1", "test2");
+ dynamoDBFeedSource.setParameters(map);
+
+
+ }
+
+}
+
diff --git a/atomhopper/pom.xml b/atomhopper/pom.xml
index 3278171ae..d8cf695f0 100644
--- a/atomhopper/pom.xml
+++ b/atomhopper/pom.xml
@@ -20,6 +20,13 @@
+
+ com.amazonaws
+ aws-java-sdk-dynamodb
+ 1.12.352
+
+
+
org.atomhopper
core
@@ -55,6 +62,12 @@
jdbc-adapter
+
+ org.atomhopper.adapter
+ dynamoDB_adapters
+ 1.2.35-SNAPSHOT
+
+
postgresql
postgresql
@@ -84,6 +97,31 @@
me.moocar
logback-gelf
+
+
+ org.springframework
+ spring-expression
+ 4.2.5.RELEASE
+
+
+
+ org.atomhopper.adapter
+ dynamoDB_adapters
+ 1.2.35-SNAPSHOT
+
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+ 2.3.2
+
+
+
+
+ org.glassfish.jaxb
+ jaxb-runtime
+ 2.3.2
+
com.yammer.metrics
diff --git a/atomhopper/src/main/resources/META-INF/atom-server.cfg.xml b/atomhopper/src/main/resources/META-INF/atom-server.cfg.xml
index 49967182b..85e19f121 100644
--- a/atomhopper/src/main/resources/META-INF/atom-server.cfg.xml
+++ b/atomhopper/src/main/resources/META-INF/atom-server.cfg.xml
@@ -1,8 +1,6 @@
@@ -15,23 +13,23 @@ NOTE: Place this file in the following folder: /etc/atomhopper/atom-server.cfg.x
will be incorrect.
Scheme should be either "http" or "https"
-->
-
+
-
+
-
-
+
+
-
+
\ No newline at end of file
diff --git a/atomhopper/src/main/webapp/META-INF/application-context.xml b/atomhopper/src/main/webapp/META-INF/application-context.xml
index 255eebd41..67d402a3f 100644
--- a/atomhopper/src/main/webapp/META-INF/application-context.xml
+++ b/atomhopper/src/main/webapp/META-INF/application-context.xml
@@ -4,7 +4,7 @@
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mongo="http://www.springframework.org/schema/data/mongo"
xsi:schemaLocation=
- "http://www.springframework.org/schema/context
+ "http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/data/mongo
http://www.springframework.org/schema/data/mongo/spring-mongo-1.0.xsd
@@ -14,60 +14,60 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+ com.amazonaws.regions.Regions
+
+
+ fromName
+
+
+
+ us-east-1
+
+
+
+
+
+ com.amazonaws.regions.Region
+
+
+ getRegion
+
+
+
+
+
+
+
+
+
+
+
+
+ setRegion
+
+
+
+
+
+
+
+
+
+
+
+
+
+ setEndpoint
+
+
+
+ https://dynamodb.us-east-1.amazonaws.com
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/atomhopper/src/main/webapp/WEB-INF/web.xml b/atomhopper/src/main/webapp/WEB-INF/web.xml
index 1114d39f5..8fb8d65c1 100644
--- a/atomhopper/src/main/webapp/WEB-INF/web.xml
+++ b/atomhopper/src/main/webapp/WEB-INF/web.xml
@@ -14,7 +14,7 @@
-->
file:///etc/atomhopper/application-context.xml
-
+
org.atomhopper.ExternalConfigLoaderContextListener
@@ -85,14 +85,14 @@
Atom-Hopper-Metrics
/atommetrics/*
-
+
logback/configuration-resource
java.lang.String
file:///etc/atomhopper/logback.xml
-
+
ch.qos.logback.classic.selector.servlet.ContextDetachingSCL
-
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index efb1f5e57..b26928404 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,6 +19,7 @@
2.2
3.0.1
9.4.17.v20190418
+ 4.2.5.RELEASE
@@ -40,6 +41,7 @@
atomhopper
test-suite
documentation
+ adapters/dynamoDB_adapters
@@ -49,6 +51,14 @@
+
+
+ org.springframework
+ spring-expression
+ ${spring.version}
+
+
+
org.atomhopper
core
@@ -381,15 +391,12 @@
releases.maven.research.rackspace.com
Rackspace Research Releases
- https://maven.research.rackspacecloud.com/content/repositories/releases
-
-
+ https://maven.research.rackspacecloud.com/content/repositories/releases
snapshots.maven.research.rackspace.com
Rackspace Research Snapshots
- https://maven.research.rackspacecloud.com/content/repositories/snapshots
-
+ https://maven.research.rackspacecloud.com/content/repositories/snapshots