User Tools

Site Tools


Apache Kafka

Apache Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system.

Kafka is run as a cluster that stores streams of records in categories called topics. A topic is a category or feed name to which records are published.

Kafka has four core APIs:

  • Producers publish a stream of records to one or more Kafka topics.
  • Consumers subscribe to one or more topics and process the stream of records produced to them.
  • Stream processors allow an application to consume input streams and produce output streams.
  • Connectors are producers or consumers that connect Kafka topics to existing applications or data systems

More information about Kafka's design philosophy, use cases.

Implementation examples for Producers and Consumers, Kafka with Spring WebSockets.

Apache Storm

Apache Flume

Version 1.5. repository for looking into implementation details.


Custom source

Example for a PollableSource .


A tiny example for an EventDrivenSource

  • implements Configurable: configure(Context ctx)
  • implement EventDrivenSource
  • overrides start() and stop()
  • based on WebsocketClient, a Endpoint implementation.
public class CustomSource extends AbstractSource implements Configurable, EventDrivenSource {
	private WebsocketClient clientEndPoint;
  	public void configure(Context ctx) {		
		try {
			String endpoint = ctx.getString("endpoint");
			String user = ctx.getString("user");
			String pw = ctx.getString("password");
			String subscribe = ctx.getString("subscriptionString");
			clientEndPoint = new WebsocketClient(new URI(endpoint), user, pw, subscribe);
		} catch (DeploymentException | IOException | URISyntaxException e) {
		clientEndPoint.addMessageHandler(new MessageHandler() {
			public void handleMessage(String message) {
				ChannelProcessor processor = getChannelProcessor();						
				Event event = new SimpleEvent();
				JsonValue value = Json.parse(message);
			private Map<String, String> createEventHeaders(JsonValue value) {
                                Map<String, String> h = new TreeMap<String, String>();
				String oid = value.asObject().get("payload").asObject().get("id").asString();
				h.put("oid", oid);
				return h;
		}) ;
	public void start() {
	public void stop() {

HDFS sinks

HDFS sinks can produce different file formats: SequenceFile, DataStream or CompressedStream.

I used human-readable SequenceFiles

ais.sinks.hdfs1.hdfs.fileType = SequenceFile
ais.sinks.hdfs1.hdfs.writeFormat = Text

which results in:

1552319652676   {"id":"219014435","position":[11.858878135681152,57.645938873291016]}
1552319652684   {"id":"205484390","position":[3.687695026397705,51.09416961669922]}

The key (first part of each line) is by default, the event header timestamp (read e.g. here for custom keys).

If a key is not required, use a DataStream

ais.sinks.hdfs1.hdfs.fileType = DataStream 	
ais.sinks.hdfs1.hdfs.writeFormat = Text


Using a CompressedStream saves storage space:

ais.sinks.hdfs1.hdfs.fileType = DataStream 	
ais.sinks.hdfs1.hdfs.codeC = gzip
ais.sinks.hdfs1.hdfs.writeFormat = Text

Creating a more customised file name is possible via event header items. As implemented above, the header contains an object id called “oid” that can be used in the agent configuration:

ais.sinks.hdfs1.hdfs.filePrefix = %{oid}


ChannelFullException: Space for commit to queue couldn't be acquired.

I tried to write to a HDFS sink, and got this exception in the logs:

org.apache.flume.ChannelException: Unable to put event on required channel:{name: c1}
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
        ... 33 more

After researching, increasing JVM_OPTS, and unsuccessfully tuning the channel parameters, the problem turned out to be that the flume user was not allowed to write to the hdfs.path.

Installing plugins in Ambari

Flume plugins have their own dedicated directory. In Ambari it's:

mkdir /usr/hdp/current/flume-server/plugins.d/

Dependencies go in to a subdir:


or provide a fat jar instead.

Managing agents in Ambari


Simple Feature Types

Ingesting into table

General advice for developing converters


The CSV converter docs contain a lot of useful information.


The JSON converter docs already explains a lot. To ingest a ship data file which contains multiple JSON documents:

{"payload":{"mmsi":"11111","position":[5.061738442342835,52.4892342343],"SOG":0.0,"COG":0.0,"navStatus":"engaged in fishing","recordedAt":"2019-01-14T00:00:00.072Z"}}
{"payload":{"mmsi":"66666","position":[10.869939804077148,59.077205657958984],"SOG":0.0,"COG":290.3,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.084Z"}}
{"payload":{"mmsi":"88888","position":[30.880006790161133,70.88723754882812],"SOG":4.9,"COG":232.6,"navStatus":"engaged in fishing","recordedAt":"2019-03-14T00:00:02.103Z"}}

you can use this converter definition

cat conf/sfts/shipdata/reference.conf
geomesa {
  sfts {
    shipdata = {
      fields = [
        { name = "mmsi",        type = "Integer",            index = "full" }
        { name = "geom",        type = "Point", srid = 4326, index = true   }
        { name = "recordedAt",  type = "Date",               index = "full" }
        { name = "navStatus",   type = "String"                             }
        { name = "sog",         type = "Double"                             }
        { name = "cog",         type = "Double"                             }
      user-data = {
        geomesa.table.sharing = "false"

  converters {
    shipdata = {
      type = "json"
      id-field = "concatenate($mmsi,$recordedAt)"
      options {
           error-mode = "raise-errors"
      fields = [
        { name = "mmsi",        json-type = "string",   path = "$.payload.mmsi",             transform = "$0::int"             }
        { name = "lon",         json-type = "double",   path = "$.payload.position[0]"                                         }
        { name = "lat",         json-type = "double",   path = "$.payload.position[1]"                                         }
        { name = "geom",                                                                     transform = "point($lon, $lat)"   }
        { name = "recordedAt",  json-type = "string",   path = "$.payload.recordedAt",       transform = "dateTime($0)"        }        
        { name = "navStatus",   json-type = "string",   path = "$.payload.navStatus"                                           }
        { name = "sog",         json-type = "double",   path = "$.payload.SOG"                                                 }
        { name = "cog",         json-type = "double",   path = "$.payload.COG"                                                 }


  • the feature-path parameter is optional, and not needed for our simple documents
  • to create points from a json array, an intermediate transformation step is defined to extract lat and lon

The json files can be gzipped, and can reside in HDFS:

ingest -c geomesa.shipdata -C shipdata -s shipdata --user xx --password xx --zookeepers a,b,c  --input-format json  --instance myinstance --run-mode local   hdfs://server:port/ship/archive/2019-01.gz


The chat is a helpful place to deposit questions.


Hortonworks Data Platform (HDP)



Time series


twosigma flint python only

spark-timeseries no longer active, the dev recommends flint

Data protection


hdfs-audit.log files


Trainings and Certifications

From here and here

Cloudera (has swallowed Hortonworks)

HDP Spark Developer DEV-343

HDP Spark Developer DEV-343

DataFrames/DataSets/RDD, shuffling, transformations and performance tuning, streaming

  • dataOps, analysts
  • 4 days
  • 3200 $ virtual classroom or
  • 3500 € Paris

HDP Data Science SCI-241

HDP Data Science SCI-241

key concepts like NN, classification, regression, clustering, tools and frameworks like spark mlib, tensorflow

  • analysts, scientists
  • 4 days
  • on request

Cloudera Certified Associate (CCA)



DB 301 - Apache Spark™ for Machine Learning and Data Science

  • 3 days
  • virtual classroom
  • $2500

short, self-paced courses based on AWS or Azure: $75





Big Data mit Hadoop

introduction, overview, first steps

  • analysts, dataOps
  • 3 days
  • 2000 €
  • Berlin


Hewlett Packard (has swallowed MapR)

Microsoft Azure


Big Data Hadoop Certification Training Course

Aligned to Cloudera CCA175 certification exam


bigdata.txt · Last modified: 2020/07/13 13:39 by mantis