diff --git a/WORKSPACE b/WORKSPACE index b040e43fa3e..f41aac5636b 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -922,6 +922,18 @@ new_http_archive( build_file = "third_party/nomad/nomad.BUILD", ) +# for twitter connector +TWITTER_CONNECTOR_VERSION = "4.0.6" +maven_jar( + name = "org_twitter4j_core", + artifact = "org.twitter4j:twitter4j-core:" + TWITTER_CONNECTOR_VERSION, +) + +maven_jar( + name = "org_twitter4j_stream", + artifact = "org.twitter4j:twitter4j-stream:" + TWITTER_CONNECTOR_VERSION, +) + # scala integration rules_scala_version="5cdae2f034581a05e23c3473613b409de5978833" # update this as needed diff --git a/connectors/heron-twitter/src/java/BUILD b/connectors/heron-twitter/src/java/BUILD new file mode 100644 index 00000000000..e3ff08e5c29 --- /dev/null +++ b/connectors/heron-twitter/src/java/BUILD @@ -0,0 +1,21 @@ +package(default_visibility = ["//visibility:public"]) + +heron_deps_files = [ + "//heron/api/src/java:api-java-low-level", +] + +third_party_deps_files = [ + "//third_party/java:guava", + "//third_party/java:kryo", + "//third_party/java:logging", +] + +all_deps = heron_deps_files + third_party_deps_files + +java_binary( + name = 'twitter-spout', + srcs = glob(["com/twitter/heron/twitter/**/*.java"]), + deps = all_deps + [ + "//third_party/java:twitter4j", + ], +) diff --git a/connectors/heron-twitter/src/java/com/twitter/heron/twitter/spout/Authentication.java b/connectors/heron-twitter/src/java/com/twitter/heron/twitter/spout/Authentication.java new file mode 100644 index 00000000000..404afd1cb57 --- /dev/null +++ b/connectors/heron-twitter/src/java/com/twitter/heron/twitter/spout/Authentication.java @@ -0,0 +1,55 @@ +package com.streamlio.connectors.twitter; + +import java.io.Serializable; + +@SuppressWarnings("serial") +public class Authentication { + private String consumerKey; + private String consumerSecret; + private String accessToken; + private String accessTokenSecret; + private String[] keyWords; + + public Authentication() { + } + + public String getConsumerKey() { + return consumerKey; + } + + public void setConsumerKey(String consumerKey) { + this.consumerKey = consumerKey; + } + + public String getConsumerSecret() { + return consumerSecret; + } + + public void setConsumerSecret(String consumerSecret) { + this.consumerSecret = consumerSecret; + } + + public String getAccessToken() { + return accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public String getAccessTokenSecret() { + return accessTokenSecret; + } + + public void setAccessTokenSecret(String accessTokenSecret) { + this.accessTokenSecret = accessTokenSecret; + } + + public String[] getKeyWords() { + return keyWords; + } + + public void setKeyWords(String[] keyWords) { + this.keyWords = keyWords; + } +} diff --git a/connectors/heron-twitter/src/java/com/twitter/heron/twitter/spout/Twitter.java b/connectors/heron-twitter/src/java/com/twitter/heron/twitter/spout/Twitter.java new file mode 100644 index 00000000000..f1c891ad6a0 --- /dev/null +++ b/connectors/heron-twitter/src/java/com/twitter/heron/twitter/spout/Twitter.java @@ -0,0 +1,122 @@ +package com.streamlio.connectors.twitter; + +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.google.common.base.Preconditions; + +import twitter4j.FilterQuery; +import twitter4j.StallWarning; +import twitter4j.Status; +import twitter4j.StatusDeletionNotice; +import twitter4j.StatusListener; +import twitter4j.TwitterStream; +import twitter4j.TwitterStreamFactory; +import twitter4j.auth.AccessToken; +import twitter4j.conf.ConfigurationBuilder; + +import com.twitter.heron.api.Config; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.topology.OutputFieldsDeclarer; +import com.twitter.heron.api.spout.BaseRichSpout; +import com.twitter.heron.api.spout.SpoutOutputCollector; +import com.twitter.heron.api.tuple.Fields; +import com.twitter.heron.api.tuple.Values; +import com.twitter.heron.api.utils.Utils; + +@SuppressWarnings("serial") +public class Twitter extends BaseRichSpout { + private static final long serialVersionUID = 4322775001819135036L; + private static final Logger LOG = Logger.getLogger(Twitter.class.getName()); + + private String componentId; + private String spoutId; + private SpoutOutputCollector collector; + + private Authentication authInfo; + + private LinkedBlockingQueue queue = null; + private TwitterStream twitterStream; + + public Twitter(Authentication authInfo) { + this.authInfo = authInfo; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) { + this.componentId = context.getThisComponentId(); + this.spoutId = String.format("%s-%s", componentId, context.getThisTaskId()); + this.collector = spoutOutputCollector; + + this.queue = new LinkedBlockingQueue(1000); + StatusListener listener = new StatusListener() { + @Override + public void onStatus(Status status) { + queue.offer(status); + } + + @Override + public void onDeletionNotice(StatusDeletionNotice sdn) {} + + @Override + public void onTrackLimitationNotice(int i) {} + + @Override + public void onScrubGeo(long l, long l1) {} + + @Override + public void onException(Exception ex) {} + + @Override + public void onStallWarning(StallWarning arg0) { + // TODO Auto-generated method stub + } + }; + + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb.setDebugEnabled(true) + .setOAuthConsumerKey(authInfo.getConsumerKey()) + .setOAuthConsumerSecret(authInfo.getConsumerSecret()) + .setOAuthAccessToken(authInfo.getAccessToken()) + .setOAuthAccessTokenSecret(authInfo.getAccessTokenSecret()); + + twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); + twitterStream.addListener(listener); + + if (authInfo.getKeyWords().length == 0) { + twitterStream.sample(); + } else { + FilterQuery query = new FilterQuery().track(authInfo.getKeyWords()); + twitterStream.filter(query); + } + } + + @Override + public void nextTuple() { + Status ret = queue.poll(); + if (ret == null) { + Utils.sleep(50); + } else { + collector.emit(new Values(ret)); + } + } + + @Override + public void close() { + super.close(); + twitterStream.shutdown(); + } + + @Override + public void ack(Object id) {} + + @Override + public void fail(Object id) {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("tweet")); + } +} diff --git a/third_party/cereal/BUILD b/third_party/cereal/BUILD new file mode 100644 index 00000000000..3f7e5ecc3b0 --- /dev/null +++ b/third_party/cereal/BUILD @@ -0,0 +1,126 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +package_name = "cereal" +package_version = "1.2.1" + +package_file = package_name + "-" + package_version + ".tar.gz" +package_dir = package_name + "-" + package_version + +file_list = [ + "include/cereal/access.hpp", + "include/cereal/archives/adapters.hpp", + "include/cereal/archives/binary.hpp", + "include/cereal/archives/json.hpp", + "include/cereal/archives/portable_binary.hpp", + "include/cereal/archives/xml.hpp", + "include/cereal/cereal.hpp", + "include/cereal/details/helpers.hpp", + "include/cereal/details/polymorphic_impl.hpp", + "include/cereal/details/polymorphic_impl_fwd.hpp", + "include/cereal/details/static_object.hpp", + "include/cereal/details/traits.hpp", + "include/cereal/details/util.hpp", + "include/cereal/external/base64.hpp", + "include/cereal/external/rapidjson/allocators.h", + "include/cereal/external/rapidjson/document.h", + "include/cereal/external/rapidjson/encodedstream.h", + "include/cereal/external/rapidjson/encodings.h", + "include/cereal/external/rapidjson/error/en.h", + "include/cereal/external/rapidjson/error/error.h", + "include/cereal/external/rapidjson/filereadstream.h", + "include/cereal/external/rapidjson/filewritestream.h", + "include/cereal/external/rapidjson/fwd.h", + "include/cereal/external/rapidjson/internal/biginteger.h", + "include/cereal/external/rapidjson/internal/diyfp.h", + "include/cereal/external/rapidjson/internal/dtoa.h", + "include/cereal/external/rapidjson/internal/ieee754.h", + "include/cereal/external/rapidjson/internal/itoa.h", + "include/cereal/external/rapidjson/internal/meta.h", + "include/cereal/external/rapidjson/internal/pow10.h", + "include/cereal/external/rapidjson/internal/regex.h", + "include/cereal/external/rapidjson/internal/stack.h", + "include/cereal/external/rapidjson/internal/strfunc.h", + "include/cereal/external/rapidjson/internal/strtod.h", + "include/cereal/external/rapidjson/internal/swap.h", + "include/cereal/external/rapidjson/istreamwrapper.h", + "include/cereal/external/rapidjson/memorybuffer.h", + "include/cereal/external/rapidjson/memorystream.h", + "include/cereal/external/rapidjson/msinttypes/inttypes.h", + "include/cereal/external/rapidjson/msinttypes/stdint.h", + "include/cereal/external/rapidjson/ostreamwrapper.h", + "include/cereal/external/rapidjson/pointer.h", + "include/cereal/external/rapidjson/prettywriter.h", + "include/cereal/external/rapidjson/rapidjson.h", + "include/cereal/external/rapidjson/reader.h", + "include/cereal/external/rapidjson/schema.h", + "include/cereal/external/rapidjson/stream.h", + "include/cereal/external/rapidjson/stringbuffer.h", + "include/cereal/external/rapidjson/writer.h", + "include/cereal/external/rapidxml/rapidxml.hpp", + "include/cereal/external/rapidxml/rapidxml_iterators.hpp", + "include/cereal/external/rapidxml/rapidxml_print.hpp", + "include/cereal/external/rapidxml/rapidxml_utils.hpp", + "include/cereal/macros.hpp", + "include/cereal/types/array.hpp", + "include/cereal/types/base_class.hpp", + "include/cereal/types/bitset.hpp", + "include/cereal/types/boost_variant.hpp", + "include/cereal/types/chrono.hpp", + "include/cereal/types/common.hpp", + "include/cereal/types/complex.hpp", + "include/cereal/types/concepts/pair_associative_container.hpp", + "include/cereal/types/deque.hpp", + "include/cereal/types/forward_list.hpp", + "include/cereal/types/functional.hpp", + "include/cereal/types/list.hpp", + "include/cereal/types/map.hpp", + "include/cereal/types/memory.hpp", + "include/cereal/types/polymorphic.hpp", + "include/cereal/types/queue.hpp", + "include/cereal/types/set.hpp", + "include/cereal/types/stack.hpp", + "include/cereal/types/string.hpp", + "include/cereal/types/tuple.hpp", + "include/cereal/types/unordered_map.hpp", + "include/cereal/types/unordered_set.hpp", + "include/cereal/types/utility.hpp", + "include/cereal/types/valarray.hpp", + "include/cereal/types/vector.hpp", +] + +genrule( + name = "cereal-srcs", + srcs = [ + package_file, + ], + outs = file_list, + cmd = "\n".join([ + "export WORKSPACE_ROOT=$$(pwd)", + "export INSTALL_DIR=$$(pwd)/$(@D)", + "export TMP_DIR=$$(mktemp -d -t cereal.XXXXX)", + "mkdir -p $$TMP_DIR", + "cp -R $(SRCS) $$TMP_DIR", + "cd $$TMP_DIR", + "tar xfz " + package_file, + "cd " + package_dir, + "$$WORKSPACE_ROOT/$(location //scripts/compile:env_exec) cmake -Wno-dev -DCMAKE_INSTALL_PREFIX:PATH=$$INSTALL_DIR .", + "$$WORKSPACE_ROOT/$(location //scripts/compile:env_exec) make install", + "rm -rf $$TMP_DIR", + ]), + tools = [ + "//scripts/compile:env_exec", + ], +) + +cc_library( + name = "cereal-cxx", + srcs = [ + "empty.cc", + ] + file_list, + includes = [ + "include", + ], + linkstatic = 1, +) diff --git a/third_party/cereal/cereal-1.2.1.tar.gz b/third_party/cereal/cereal-1.2.1.tar.gz new file mode 100644 index 00000000000..7f469eed370 Binary files /dev/null and b/third_party/cereal/cereal-1.2.1.tar.gz differ diff --git a/third_party/cereal/empty.cc b/third_party/cereal/empty.cc new file mode 100644 index 00000000000..e69de29bb2d diff --git a/third_party/java/BUILD b/third_party/java/BUILD index ba594518e85..8e293ec611f 100644 --- a/third_party/java/BUILD +++ b/third_party/java/BUILD @@ -361,3 +361,16 @@ java_library( "@commons_logging_commons_logging//jar", ], ) + +java_library( + name = "twitter4j", + srcs = [ "Empty.java" ], + exports = [ + "@org_twitter4j_stream//jar", + "@org_twitter4j_core//jar", + ], + deps = [ + "@org_twitter4j_stream//jar", + "@org_twitter4j_core//jar", + ], +)