Skip to content

Developer Guide

Ambud edited this page Jan 1, 2017 · 5 revisions

Linea is a framework for embedded Stream processing capabilities and provides an Apache Storm like API. Here are a few facts about Linea:

  • Linea supports only at-least once semantics
  • Topologies are hard wired (at the moment)
  • Bolts support only 1 type of Routing

Get the dependency: (check the latest version from releases)

<dependency>
    <groupId>com.srotya</groupId>
    <artifactId>linea</artifactId>
    <version>0.0.x</version>
</dependency>

Working example: https://github.com/srotya/linea/tree/master/src/main/java/com/srotya/linea/example

Note: Linea is written in Java however can be used from other JVM languages as well using the Java classes.

Components

Before we dive into code, let's review the components of this framework:

Event: Linea's version of a Storm Tuple

Bolt: Custom code to execute

Spout: A special class of Bolt that generates Events

BoltExecutor: Operates a collection of Bolt instances in parallel

Collector: A facade used by Spouts and Bolts for emitting and acking Events

Router: Responsible for Routing events to the correct bolt instance across workers

Acker: A special Bolt that uses GROUPBY Routing Type

ROUTING_TYPE: Linea's version of Storm Grouping

Columbus: A worker discovery service

Topology: Topology is Directed Graph representation of the Stream Processing flow

The logical flow of Linea is extremely simple, there are Spouts and Bolts each of them have several instances which provides parallelism and Events are routed between them depending on your topology.

Unlike Storm, Linea topologies are hardwired and not dynamically configurable at the moment, each Bolt must know what bolt name is next in the processing line for a given event and Bolt must know what Routing Type it supports. More on this in the next Spout / Bolt section.

Topology

The topology represents how Events will flow through the different processing parts called Bolts. Topology class is what will execute your entire streaming flow and is something that should be run in a main method or it's equivalent in your application. The topology code will run identically across all the workers of this topology that you launch, where launching worker is simply starting your Java application with one or more Linea topologies inside it.

Below is what a topology looks like in Java code. In this topology the TestSpout generates random events that are consumed by the PrinterBolt.

Map<String, String> conf = new HashMap<>();
conf.put(Topology.WORKER_ID, "0");
conf.put(Topology.WORKER_COUNT, "1");
conf.put(Topology.WORKER_DATA_PORT, "5000");
conf.put(Topology.ACKER_PARALLELISM, "1");
Topology topology = new Topology(conf);
topology = topology.addSpout(new TestSpout(1000000), 1).addBolt(new PrinterBolt(), 1).start();

Collector

This is simply a facade to provide developer-friendly methods for acking and emitting Events. There is 1 unique collector instance per Bolt instance.

Bolt

A Bolt (similar to Storm Bolt) is provides the wrapper to execute your custom processing like transformations, aggregations, sink etc. A Bolt is defined by implementing the com.srotya.linea.processors.Bolt interface.

Here's a simple bolt that simply prints and acks all the Events it receives:

public class PrinterBolt implements Bolt {

	private static final long serialVersionUID = 1L;
	private transient Collector collector;

	@Override
	public void configure(Map<String, String> conf, int taskId, Collector collector) {
		this.collector = collector;

	}

	@Override
	public void process(Event event) {
		System.out.println("Print event:" + event);
		collector.ack(event);
	}

	@Override
	public ROUTING_TYPE getRoutingType() {
		return ROUTING_TYPE.SHUFFLE;
	}

	@Override
	public String getBoltName() {
		return "printerbolt";
	}

	@Override
	public void ready() {
	}

}

Each bolt must implement the all the methods annotated with @Override. Here are a few requirements while implementing Bolts:

  • Must have a unique name, other bolts will use this name to send Event to it
  • Must have SHUFFLE or GROUPBY routing types (more on this later)
  • Must be serializable

The configure method is like a constructor you can use it to start and connect any external connections if necessary or instantiate any data structures since the Bolts are serializable so any non-serializable fields can't be instantiated from the constructor.

The process method is called every time there is an event to be consumed.

Routing Type

A Bolt can have different Event subscription strategies, it's how it would like to receive Events. E.g. if a bolt is designed to logically perform aggregations then it should use GROUPBY Routing, else if a bolt performs stateless transformations it should use a SHUFFLE Routing strategy.

If a Bolt uses GROUBY strategy, it must have a non-null Constants.FIELD_GROUPBY_ROUTING_KEY header value. Router takes a Murmur hash of this value for the routing mod hash.

If a Bolt uses SHUFFLE strategy, it's Event ID is used for mod hash

A Bolt can have only 1 type of Routing strategy.

Spout

Spout is a special type of Bolt that is generates Events. To implement your own Spout, you must use extend the com.srotya.linea.processors.Spout class.

The Spout is free to generate Events however it likes, there is no polling for nextEvent in Linea, rather the Spout like all other Bolts is given the ready() method that executes under a separate thread and can infinitely generate Events.

Here's a simple Spout:

public class TestSpout extends Spout {

	private static final long serialVersionUID = 1L;
	private transient Collector collector;
	private transient int taskId;
	private int messageCount;

	public TestSpout(int messageCount) {
		// number of messages to emit
		this.messageCount = messageCount;
	}

	@Override
	public void configure(Map<String, String> conf, int taskId, Collector collector) {
		this.taskId = taskId;
		this.collector = collector;
	}

	@Override
	public String getBoltName() {
		return "testSpout";
	}

	@Override
	public void ready() {
		for (int i = 0; i < messageCount; i++) {
			Event event = collector.getFactory().buildEvent(taskId + "_" + i);
			event.getHeaders().put("uuid", taskId + "host" + i);
			emittedEvents.add(event.getEventId());
			// send all of these events to printer bolt
			collector.spoutEmit("printerbolt", event);
			if (i % 100000 == 0) {
				System.err.println("Produced " + i + " events:" + taskId);
			}
		}
	}

	@Override
	public void ack(Long eventId) {
		System.out.println("Spout acking event:" + eventId);
	}

	@Override
	public void fail(Long eventId) {
		System.out.println("Spout failing event:" + eventId);
	}

}

Router

The Router is probably the most critical part of Linea as it's responsible to routing Events to the correct bolt instance. A given type of Bolt can have 1 or more instances and these instances can be spread across 1 or more workers, it's the responsibility of the Router to calculate which Bolt instance should process a given event based on the Routing Type of the destination Bolt and it's location i.e. is it running locally or is it on a different worker node.

Router is also responsible to manage the Network Client and Servers for Linea that are used for Inter-Worker communication when Events need to be sent and received across workers over the network.

As a developer you will not be interacting with the Router directly, rather you will interact with the Collector which further interacts with the Router.

FAQs

Why is dynamic topology configuration not available?

Linea is designed to be an embeddable framework, the analogy is applications require tight coupling rather than generic components. Some level of decoupling can be achieved using abstract classes for Spout and Bolt implementations so that end developers can define their topology specific internals. Having written several complex Streaming applications, the dynamic re-configuration was rarely used for solving business problems.

If you would like dynamic topology reconfiguration support, please open an issue.

Why does a Bolt have to be serializable?

Bolt is required to be serializable so that it can be cloned into multiple instances. These instances are then run under different threads to provide parallelism.

Can I have odd parallelism for a given Bolt across workers?

No, Linea will always run the same number of Bolt Executor threads for a given bolt across all workers. Therefore if you have a Bolt parallelism of 'n' then the total parallelism for your Bolt across all 'm' workers will be 'n' x 'm' e.g. if you configure a bolt with parallelism of 1 across 10 workers, your total parallelism is 10 or if you configure a bolt with parallelism of 3 across 3 workers, your total parallelism for that bolt is 9.

What's the difference between a Bolt instance and a Bolt type?

Your topology can have different types of Bolts e.g. JSONTranslatorBolt, PrinterBolt, HDFSBolt. Each of these bolts will have 1 or more instances depending on your parallelism configuration for that bolt.

How to do performance tuning?

Here's everything performance related to help you tune your topology: https://github.com/srotya/linea/wiki/Performance-Tuning

What serialization does Linea use for Inter-Worker Communication (IWC)?

Linea uses Kryo for IWC. No serialization used if your Even never leaves a worker.

Is it necessary for Worker IDs to start from 0?

Yes, Linea's Router uses mod(%) hash based balancing to route Events across Bolt instances across different workers. The worker IDs starting from 0 is critical to the math used to calculate this.

What's a Instance ID, how can I use it?

Instance ID is similar to a task ID in Storm and it providing information of the unique ID of a given Bolt instance. E.g. if Bolt parallelism is 2 and there are 2 workers then your Instance IDs will be Worker0[0, 1] and Worker1[2, 3]

You can use InstanceIDs to perform State persistence to an external data store like Redis or HBase as if a worker is restarted, it can recover it's state and not that of any other Bolt.