What is Spring XD?

Spring XD is a unified, distributed, and extensible service for data ingestion, real time analytics, batch processing, and data export. What does that mean to the average person? If we want to move a massive amount of data from Greenplum and land it in Gemfire for application use, we can do that. If we want to take real time data from a queue or a REST API and move it into MySQL, we can do that too. Summarily, Spring XD solves the problem that many companies have when wanting to move large amounts of data from point A to point B. One of the great flexibilities of Spring XD is that it doesn’t really matter to us what point A looks like. It could be a database, a file system, a webserver, an API, or even an Amazon S3 bucket. If Spring XD doesn’t have a native module (we’ll learn what those are in just a little bit) then we can easily script up a bit of Python to grab whatever data need from point A. Literally the only limitation we run into with Spring XD is that of the creativity of the developer.

Before we dive into our demo, lets get some of the preliminaries out of the way. We’ll be defining these terms at a high level in order to get on with the fun part, but a more in depth discussion of Spring XD can always be found in their documentation.

http://docs.spring.io/spring-xd/docs/1.3.0.RELEASE/reference/html/

We define a module as simply a stream-processing task, or one unit of work with respect to the entire data stream. In this post we’ll discuss three different types of modules: source modules, processor modules, and sink modules. Spring XD has a plethora of different modules that are available to the end user with a vanilla installation of the service. Some of these include shell processors, transformers, filters, counters, and jobs. We even have modules for the sources we’re grabbing our data from, and the sinks that we are landing the data too. A few of the more popular source and sink modules that are native to Spring XD are Greenplum, Gemfire, and HDFS.

Intuitively we define a stream as simply a pipe delimited chain of modules.

buy cheap viagra online source | processor one | processor two | sink

We aren’t without limitations though. We’re only allowed one source module and one sink module per stream. If we run into a situation where we want to sink the same data to two different destinations we can use a stream tap. You can think of this in essentially the same way you would think of a wiretap on a telephone, and we’ll cover this more later on in the example. We can have as many processor modules as we wish, where our only limitation is how much memory our container has available for us to use.

Once we create and deploy a stream the modules that define the stream deploy on a container. The Spring XD admin server speaks to zookeeper [zookeeper overview] which dictates which module will be deployed to which container. At a high level you can just think of a container as something that holds our modules. Generally we will deploy one container per node, however you can deploy as many containers per node as you have CPU’s available.

With respect to high availability, consider the following example.  Suppose we have a four-node cluster with one admin server, and four container servers which are evenly distributed among the nodes. If any one of these containers become unhealthy or unavailable, the modules on that container will automatically migrate to an existing healthy container in the cluster. This is how Spring XD avoids any kind of stream downtime. If we were to add another admin server to the cluster, it too would take the place of an unhealthy admin server. Hence, our stream is highly available and the data will keep flowing with a reasonable degree of certainty.

Demo Prerequisites

In this demo we’re going to poll earthquake data from the USGS (U.S. Geological Survey).  It is recommended that you have at least 8 gigabytes of memory available for this demo.  Below is a list of software that we’ll be using.

  • Spring XD
    • Single node installation
  • Redis
    • In-memory key value store
    • Takes the place of a message bus for single node installation
    • Not suggested for production environments (Instead consider RabbitMQ or Kafka)
  • Gemfire (Apache Geode)
    • In-memory key value store, excellent for application use
    • Distributed, fault tolerant, highly available
  • Greenplum
    • Long term data storage and analytics via MADlib
    • MPP (Massive Parallel Processing)
  • Zookeeper
    • Embedded in single node installation of Spring XD
    • Dictates which module deploys on which container server
  • Groovy
    • Data point transformation mapping and standardization
  • Python
    • Poll source data from API

Note that the single node installation of Spring XD comes with an embedded version of Redis and Zookeeper, so there is no need to install those services separately.

zData Analytics Sandbox

We encourage you to download the zData Analytics Sandbox to avoid the manual installation Spring XD, Greenplum, and MADlib. However everything we do in this example can be seamlessly integrated into a distributed environment with ease.

https://zdatainc.com/zdata-greenplum-sandbox

We’ll be downloading the VirtualBox image. Once you’ve downloaded the image, unzip it and double-click centos65-greenplum.ovf.

blog1

The first thing we’re going to want to do is up the memory of our VM since we’re going to be running Apache Geode (Gemfire) which isn’t natively included in our sandbox.

In Virtualbox right-click centos65-greenplum and go to settings. This will open up a new window, then click the System tab. From here you’ll see a slider representing the memory of our VM and a text box to the right of it. In the text box enter 8000 to give our VM eight gigabytes of memory (assuming you have 8 GB free).

blog10

After you’ve modified the memory just double click centos65-greenplum in the left hand sidebar of Virtualbox to fire it up. Once you’ve started it up you’ll see the following prompt.

blog2

You’re now logged into the zData Analytics Sandbox. Within the sandbox we have Greenplum + MADlib along with Spring XD already fired up and ready to use.

Groovy Installation

Our sandbox already has python installed, so lets begin with installing Groovy.

$ sudo -i
# cd /usr/share
# wget http://dl.bintray.com/groovy/maven/apache-groovy-binary-2.4.5.zip
# unzip apache-groovy-binary-2.4.5.zip
# cat > /etc/profile.d/groovy.sh <<EOF
> export GROOVY_HOME=/usr/share/groovy-2.4.5
> export PATH=/usr/share/groovy-2.4.5/bin:$PATH
> EOF
# su - gpadmin

Groovy should now be successfully installed and you should be able to issue the following command and see the same output.

blog3

We’ll be using Groovy to write our data transformation code. The earthquake data we’re polling is in JSON format and Groovy makes it very easy to traverse and parse this data.

Data Overview

Before we go any further, we really need to know what the data we’re polling looks like. Without a good grasp on the structure and variability of our data and data points we’re put in a stalemate. You should always ask yourself the following questions when dealing with new data.

  • What format is the data (XML, JSON, etc)
  • Does that data change with each poll?
  • Are there any unique identifiers?
  • Do we need to make any unit conversions to any data points?

Each of these questions are paramount for building our data stream. In this example we’ll be polling earthquake data so a lot of the data points we poll won’t be easily recognizable to us.  You can follow the link below to check the USGS documentation.  They use a standard formatting called GeoJSON.

http://earthquake.usgs.gov/earthquakes/feed/v1.0/geojson.php

To get a sample data record you can simply type the following into your web browser.

http://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=2016-01-10T00:46:58.309933&endtime=2016-01-10T01:03:38.309933

{
    "type": "FeatureCollection",
    "features": [
        {
            "geometry": {
                "type": "Point",
                "coordinates": [
                    -122.793663,
                    38.8074989,
                    0.2
                ]
            },
            "type": "Feature",
            "properties": {
                "rms": 0.02,
                "code": "72578011",
                "cdi": null,
                "sources": ",nc,",
                "nst": 7,
                "tz": -480,
                "title": "M 0.5 - 4km NW of The Geysers, California",
                "magType": "md",
                "detail": "http://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nc72578011&format=geojson",
                "sig": 4,
                "net": "nc",
                "type": "earthquake",
                "status": "automatic",
                "updated": 1452387538830,
                "felt": null,
                "alert": null,
                "dmin": 0.001364,
                "mag": 0.52,
                "gap": 90,
                "types": ",general-link,geoserve,nearby-cities,origin,phase-data,",
                "url": "http://earthquake.usgs.gov/earthquakes/eventpage/nc72578011",
                "ids": ",nc72578011,",
                "tsunami": 0,
                "place": "4km NW of The Geysers, California",
                "time": 1452387447850,
                "mmi": null
            },
            "id": "nc72578011"
        },
        {
            "geometry": {
                "type": "Point",
                "coordinates": [
                    -118.9421692,
                    37.6463318,
                    4.73
                ]
            },
            "type": "Feature",
            "properties": {
                "rms": 0.03,
                "code": "72578006",
                "cdi": null,
                "sources": ",nc,",
                "nst": 9,
                "tz": -480,
                "title": "M 1.1 - 2km E of Mammoth Lakes, California",
                "magType": "md",
                "detail": "http://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nc72578006&format=geojson",
                "sig": 18,
                "net": "nc",
                "type": "earthquake",
                "status": "automatic",
                "updated": 1452387198800,
                "felt": null,
                "alert": null,
                "dmin": 0.01977,
                "mag": 1.07,
                "gap": 120,
                "types": ",general-link,geoserve,nearby-cities,origin,phase-data,",
                "url": "http://earthquake.usgs.gov/earthquakes/eventpage/nc72578006",
                "ids": ",nc72578006,",
                "tsunami": 0,
                "place": "2km E of Mammoth Lakes, California",
                "time": 1452387104810,
                "mmi": null
            },
            "id": "nc72578006"
        }
    ],
    "bbox": [
        -122.793663,
        37.6463318,
        0.2,
        -118.9421692,
        38.8074989,
        4.73
    ],
    "metadata": {
        "status": 200,
        "count": 2,
        "title": "USGS Earthquakes",
        "url": "http://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=2016-01-10T00:46:58.309933&endtime=2016-01-10T01:03:38.309933",
        "generated": 1452387819000,
        "api": "1.1.0"
    }

We can see that we’re given location data (coordinates), magnitude size, tsunami warning (1 yes, 0 no), and a title which gives us a human readable description of the event.  We can always transform these data point names to something more pleasing by using a Groovy transformation script.

To go back to our data questions, lets answer a few before we move forward.

  • Data format: JSON
  • Data point variability: Data structure static each pull, array size is variable
  • Unique identifiers: id
  • Unit conversions: Convert time from epoch time to date time stamp.

With these questions answered we can move on to the high level design of what our stream is going to look like. Since the data is coming from a REST API we’re going to want to begin the stream with a trigger source module.

Stream Design

Trigger Module

All the trigger source module does is send a carriage return to the proceeding module at a time interval of our choosing.  Using the USGS API we can pull data at whatever interval we choose, however we aren’t guaranteed to get data back with every poll. For the purposes of this demo we’ll poll data at every 10 seconds and we’ll filter out any null payloads we may get in return.

Shell Module

The second module in our stream will be a shell command processor module. All this module does is execute the code that we give it. This will be our python script that polls that data from the API. We’ll need to code it in such a way that it waits for carriage returns from the trigger module, and polls the API as soon as it detects a carriage return.

Transform Module

Next we’re going to want to transform the data that we get from the poll. This is where we use our transformer module, which is just a bit of Groovy code. In this code we’re going to map the data points to standardized names, and make our unit conversions from epoch time to a date time stamp. Additionally, if there are data points in the record that we don’t necessarily care about we have the option of not returning them here. Generally for analytics the rule of thumb is the more data the better. So perhaps this is where we should consider either a tap, or a completely different stream.

Splitter Module

The transform module is going to return a JSON array full of individual records.  We want the ability to sink each of these individual records into Geode.  Conveniently Spring XD offers us the use of a splitter module that will split any payload into multiple payloads (i.e., one large JSON record into many).  This is important for use to use when sinking data to Geode since it is expecting one JSON record per sink.  It also helps ensure our data integrity as it passes through the stream.  For example, suppose we have some kind of failure where our message bus drops one of our messages (however unlikely).  If we split this payload beforehand it could mean the difference between losing a million messages and only one.

Filter Module

The filter module should be intuitive. It simply filters messages out of our stream given the conditions that we set. If we poll data from USGS and we’re not given any events back in that 10 second interval then we’re going to get a null payload. Since the Gemfire and Greenplum sink modules are expecting data, we need to filter out any null payload we may receive from the API at this point in the stream.

Sink Module

Lastly, the sink module refers to where we want to land our data.  In our example we’ll be using the gemfire-json-server and gpfdist sink modules in order to land our data in Geode and Greenplum respectively.

High-level Stream Definitions

Combining these modules make up our streams.  Note that we’ll still need to add a number of optional parameters but this is a good roadmap.

trigger | shell (Python) | transformer (Gemfire data) | splitter | filter (null data) | sink (Gemfire)

read more trigger | shell (Python) | transformer (Greenplum data) |  filter (null data) | sink (Greenplum)

Why Geode and Greenplum?

Geode is generally used for applications since it offers such quick access to data. Our earthquake app may only want to show the event time, title, location, and and magnitude. In this case we don’t really care about the rest of the data.   So we could have a stream that just extracts and maps these data points. Since Greenplum is an MPP database it’s going to serve as our long term storage solution.  With the help of MADlib we can run analytics on this data to retrieve a number of different insights not readily seen on the surface of that data.  For this reason we’re going to want to sink all of the data points available into Greenplum in their raw format.

With the combination of Geode and Greenplum we not only have an infrastructure that supports application use, but also BI tools, data science, analytics, and long term storage.

Stream Tap

A tap is a stream in and of itself. We can tap any stream at any of its modules, and then transform that data and sink to our preferred destination. In a production environment you could tap either of these streams to save memory, however since we’re working in a VM we don’t have enough memory to run both Greenplum and Geode (Gemfire) at the same time. Hence, we’ll be working with separate streams for both.

trigger | shell (Python) | transformer (Gemfire data) | splitter | filter (null data) | sink (Gemfire)

here tap (data output from Gemfire shell module) | transformer (Greenplum data) |  filter (null data) | sink (Greenplum)

In the above stream definitions observe that the tapped stream won’t deploy the trigger or shell module that the former stream contains.  The tap source is essentially the data output of whatever module in the stream you’re tapping you specify.  In the definitions above we’re tapping the Geode stream before the transformer module so we’re able to access all the complete raw data polled from the API.  This is a great way to conserve memory. One should note that if for any reason the trigger or shell module in the former stream quits, then the latter stream also quits.  This is highly unlikely since Spring XD is built for this kind of scenario, however it should always be noted when building a stream tap.

Download Scripts

To speed up the process a bit we’ve prepackaged all of the scripts we’re going to be using for our streams. However to benefit from this demo we encourage you to follow along and hand code all of the scripts we’re going to use.

$ sudo -i
# mkdir /quakePoll
# cd /quakePoll
# wget https://dl.dropboxusercontent.com/u/33925634/zdata-xd-demo.tar.gz
# tar -zxvf zdata-xd-demo.tar.gz

 

Data Polling Script (Python)

We’re going to need to install two python modules in order for our poll script to work correctly. To do this, simply issue the follow commands in your VM terminal.

$ sudo -i
# yum install -y epel-release 
# yum install -y python-pip
# pip install requests
# pip install pytz

 

Now that we have a good idea of what our stream is going to look like at a high level, we can go ahead and start coding. Below is a python script that polls the earthquake data from the API. It’s important to remember that the first module is a trigger module which sends a carriage return on a time interval, so our Python script has to be ready for that. Below is just such a script that will run on a loop and execute the poll every time it receives a carriage return.

#!/usr/bin/env python
# /quakePoll/quakePoll.py
# zData Inc (2016) Spring XD Streaming Earthquake Data Demo
#
# Description:  Polls earthquake information from USGS upon 
#

import requests
import sys
import json
import datetime
import pytz

def eod():
    sys.stdout.write("\r\n")
    sys.stdout.flush()

while True:
    # Wait for carriage return
    data = raw_input()

    # Get current time iso8061, and time 10 seconds ago to construct interval
    current_date = datetime.datetime.now(tz=pytz.utc)
    prev_date = current_date - datetime.timedelta(seconds=10)

    # Build USGS url
    url = "http://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=" + str(prev_date.isoformat())[:-6] + "&endtime=" + str(current_date.isoformat())[:-6]
    # Make API request and poll data
    r = requests.get(url)
    sys.stdout.write(json.dumps(r.json()))
    eod()

Transformer Code (Groovy)

Now we’re going to write our transformer module code. This code will take the data from what is polled by quakePoll.py and transform it into a new JSON record for Geode containing only the data points we want. Moreover, we’ll have another transformer that does the same exact thing except return CSV formatted data for Greenplum to ingest.

/* *
  * /quakePoll/quake-gf.groovy
  * zData Inc. (2016)
  */
import groovy.json.*

def slurper = new JsonSlurper()
def jsonPayload = null;
def list = []

try {
    // Parse payload
    jsonPayload = slurper.parseText(payload)

    // Parse and map data
    jsonPayload.features.each {
        def output = new LinkedHashMap()

        // Get current date
        TimeZone.setDefault(TimeZone.getTimeZone('UTC'))
        def now = new Date()
        def currentDate = now.format("yyy-MM-dd'T'HH:mm:ss:SS'Z'")

        // Convert event time from epoch to UTC date time
        def eventDate = new Date(((long)it.properties.time)).format("yyyy-MM-dd'T'HH:mm:ss:SS'Z'")

        output['read_datetime'] = "${currentDate}"
        output['event_datetime'] = "${eventDate}"
        output['code'] = "${it.properties.code}"
        output['nst'] = "${it.properties.nst}"
        output['tz_offset'] = "${it.properties.tz}"
        output['magnitude'] = "${it.properties.mag}"
        output['tsunami'] = it.properties.tsunami > 0 ? "Tsunami warning" : "No tsunami threat"
        output['detail'] = "${it.properties.detail}"
        output['type'] = "${it.properties.type}"
        output['title'] = "${it.properties.title}"
        it.geometry.coordinates.eachWithIndex { item, index ->
            if (index == 0) output['longitude'] = "${item}" 
            if (index == 1) output['latitude'] = "${item}" 
            if (index == 2) output['depth'] = "${item}"
        }
        list << output;
    }
} catch (Exception e) {
    println e
}

def data = [Data: list]

// Return new JSON record
return JsonOutput.toJson(data)

Note that we are only returning select data points from the original data record. Namely, data points that we think could be useful in an application context that an end user would appreciate.

For our Greenplum stream we’re going to ingest all of the raw data.  It then follows that we’ll be mapping many more data points. We won’t be making any unit conversions either since we’re going to want completely raw data for our analytics.

/**
  * /quakePoll/quake-gp.groovy
  * zData Inc (2016)
  */
import groovy.json.*

def slurper = new JsonSlurper()
def jsonPayload = null;
def list = []
def csv = ""

try {
    // Parse payload
    jsonPayload = slurper.parseText(payload)

    // Parse and map data
    jsonPayload.features.each {
        def output = new LinkedHashMap()

        // Get current date
        TimeZone.setDefault(TimeZone.getTimeZone('UTC'))
        def now = new Date()
        def currentDate = now.format("yyy-MM-dd'T'HH:mm:ss:SS'Z'")


        output['read_datetime'] = "${currentDate}"
        output['rms'] = "${it.properties.rms}"
        output['code'] = "${it.properties.code}"
        output['cdi'] = "${it.properties.cdi}"
        output['sources'] = "${it.properties.sources}"
        output['nst'] = "${it.properties.nst}"
        output['tz'] = "${it.properties.tz}"
        output['title'] = "${it.properties.title}"
        output['magtype'] = "${it.properties.magType}"
        output['detail'] = "${it.properties.detail}"
        output['sig'] = "${it.properties.sig}"
        output['net'] = "${it.properties.net}"
        output['type'] = "${it.properties.type}"
        output['status'] = "${it.properties.status}"
        output['updated'] = "${it.properties.updated}"
        output['felt'] = "${it.properties.felt}"
        output['alert'] = "${it.properties.alert}"
        output['dmin'] = "${it.properties.dmin}"
        output['mag'] = "${it.properties.mag}"
        output['gap'] = "${it.properties.gap}"
        output['types'] = "${it.properties.types}"
        output['url'] = "${it.properties.url}"
        output['ids'] = "${it.properties.ids}"
        output['tsunami'] = "${it.properties.tsunami}"
        output['place'] = "${it.properties.place}"
        output['time'] = "${it.properties.time}"
        output['mmi'] = "${it.properties.mmi}"
        
        it.geometry.coordinates.eachWithIndex { item, index ->
            if (index == 0) output['longitude'] = "${item}" 
            if (index == 1) output['latitude'] = "${item}" 
            if (index == 2) output['depth'] = "${item}"
        }
        list << output;
    }
} catch (Exception e) {
    println e
}

// Convert hash map to pipe delimited string
list.each { 
    it.each {
        csv = csv + it.getValue() + "|"
    }
    csv = csv.substring(0, csv.length() - 1) 
    csv = csv + "\n"
}

// Return final string, trim off null byte for Greenplum ingestion
return csv.trim().replaceAll("\\x00",'')

Stream Definitions

Since we have two different streams, we’re going to need two separate stream definitions. It’s convenient for us to place both of these streams individually in their own files to cut down on the time it will take for us to deploy them in Spring XD.

// /quakePoll/gf-stream.xd

stream create --name quake-gf --definition "trigger --initialDelay=10 --fixedDelay=10 | shell --bufferSize=1000000 --command=\"python /quakePoll/quakePoll.py\" | transform --script=file:/quakePoll/quake-gf.groovy --outputType=application/json | splitter --expression=#jsonPath(payload,'$.Data') --outputType=application/json | filter --expression=payload!='' | gemfire-json-server --useLocator=true --host=master --port=10334 --regionName=quakeDemo --keyExpression=payload.getField('read_datetime')" --deploy

 

// /quakePoll/gp-stream.xd

stream create --name quake-gp --definition "trigger --initialDelay=10 --fixedDelay=10 | shell --bufferSize=1000000 --command=\"python /quakePoll/quakePoll.py\" | transform --script=file:/quakePoll/quake-gp.groovy --outputType=text/plain | filter --expression=payload!='' | gpfdist --dbHost=master --dbUser=gpadmin --dbPassword=gpadmin --dbPort=6543 --table=quake_raw --batchTimeout=5 --batchCount=5 --batchPeriod=0 --flushCount=200 --flushTime=2 --rateInterval=1000000 --columnDelimiter='|'" --deploy

You should observe that the stream definitions are exactly how we described at a higher level, except with some optional parameters included. One parameter that I want to focus on is bufferSize within the shell module. It’s important that we have some idea of how large our data is going to be that is coming from our API with each poll. We set this bufferSize to the accommodate the largest value that we think our data polling response will be. The consequences of setting this too small will mean that your data will be fragmented into at least two different messages in the message bus. This becomes a problem because our transformer is expecting to see a message with a valid JSON structure — if the payload is fragmented the valid JSON structure is destroyed. All other parameters can be found in the Spring XD documentation, but for the sake of brevity we assume that the user can intuitively see what the stream is doing.

Greenplum Table

Lastly, we’re going to have to create table in Greenplum to store our raw data.  Below we’ve included a table definition that you can take advantage of for the demo.

CREATE TABLE quake_raw (
    read_datetime TEXT,
    rms TEXT,
    code TEXT,
    cdi TEXT,
    sources TEXT,
    nst TEXT,
    tz TEXT,
    title TEXT,
    magtype TEXT,
    detail TEXT,
    sig TEXT,
    net TEXT,
    type TEXT,
    status TEXT,
    updated TEXT,
    felt TEXT,
    alert TEXT,
    dmin TEXT,
    mag TEXT,
    gap TEXT,
    types TEXT,
    url TEXT,
    ids TEXT,
    tsunmai TEXT,
    place TEXT,
    time TEXT,
    mmi TEXT,
    longitude TEXT,
    latitude TEXT,
    depth TEXT
) DISTRIBUTED RANDOMLY;

Now that you’ve created your table in Greenplum we just have to change some permissions to make sure that Spring XD can read and execute the files that we have created.

$ sudo -i
# chown :pivotal –R /quakePoll

Deploying the Greenplum Stream

To begin with, let’s now test out our Greenplum stream.

$ sudo -i
# cd /opt/pivotal/spring-xd/shell/bin
# ./xd-shell

blog5

To deploy our Greenplum stream we simply issue the following command in our Spring XD shell.

xd:> script /quakePoll/gp-stream.xd

This executes the stream definition that we have already defined and saved. To view that your stream has successfully deployed you can issue a stream list command in your shell.

blog11

To ensure that the data is being landed into Greenplum we can check the table.  You should note that since we are polling realtime earthquake data that you might not see any data immediately.  We actually have to wait for an earthquake somewhere in the world to occur before we can collect that data.

# su - gpadmin
$ psql
psql> \x on
psql> select * from quake_raw;

blog13

At this point we’re convinced that we can stream data in to Greenplum.  Lets go ahead and destroy the stream and stop Greenplum.

$ sudo -i su
# cd /opt/pivotal/spring-xd/shell/bin
# ./xd-shell
xd:> stream all destroy
xd:> quit
# su - gpadmin
$ gpstop

The above will destroy the stream and shutdown the Greenplum database to ensure that we have enough memory in our VM to sink to Geode. In order to use Geode we have to create a locator, server, and a region.

Building and Installing Geode

To stream to Geode we’ll first need to build and install it, then create a locator, server, and region. To do this we’ll need to install git so we can clone the repo and then build it with Gradle. Gradle is included in the Geode build so there is no need to install that independently.

$ sudo -i
# yum install –y git
# cd /opt/pivotal
# git clone https://github.com/apache/incubator-geode.git
# cd incubator-geode
# ./gradlew build installDist –Dskip.tests=true –stacktrace
# cd gemfire-assembly/build/install/apache-geode
# ./bin/gfsh
gfsh> start locator --name=locator
gfsh> start server --name=server
gfsh> create region --name=quakeDemo --type=REPLICATE
gfsh> quit

Deploying the Geode Stream

At this point we have a Geode locator and server running with a region created in the server. Lets go ahead and go back into the Spring XD shell and deploy our Geode stream.

$ sudo -i
# cd /opt/pivotal/spring-xd/shell/bin
# ./xd-shell
xd:> script /quakePoll/gf-stream.xd

To ensure that the data is being landed in Geode we can go back and open up gfsh.

# cd /opt/pivotal/incubator-geode/gemfire-assembly/build/install/apache-geode
# ./bin/gfsh
gfsh> connect --locator=localhost[10334]
gfsh> describe region --name=/quakeDemo

blog14

We can even query Geode using OQL.

gfsh> query --query=”select * from /quakeDemo”

Screen Shot 2016-01-11 at 4.42.02 PM

 

Conclusion

And there you have it, we’re polling data from a public REST API using a custom python script and using Spring XD. We’re taking that data and transforming it using custom Groovy scripts. We’re then landing select data points in Geode (Gemfire) and all raw data into Greenplum.

You can apply this method to practically any API. If you can the write code to pull the data from the source of your choice, then you can stream that data using Spring XD and land it using the sink module of your choice.

Our next blog will be a continuation of Spring XD, however we’re going use the real time analytics Spring XD feature and MADlib in Greenplum. So stay on the look out for that!