Analyzing huge sensor data in near realtime with Apache Spark Streaming

2015-11-25

For this demo I downloaded and installed Apache Spark 1.5.1 Suppose you have a stream of data from several (industrial) machines likeMACHINE,TIMESTAMP,SIGNAL1,SIGNAL2,SIGNAL3,... 1,2015-01-01 11:00:01,1.0,1.1,1.2,1.3,.. 2,2015-01-01 11:00:01,2.2,2.1,2.6,2.8,. 3,2015-01-01 11:00:01,1.1,1.2,1.3,1.3,. 1,2015-01-01 11:00:02,1.0,1.1,1.2,1.4,. 1,2015-01-01 11:00:02,1.3,1.2,3.2,3.3,.. ...Below a system, written in Python, that reads data from a stream (use the command “nc -lk 9999” to send data to the stream) and every 10 seconds collects alerts from signals: at least 4 suspicious values of a specific signal of the same machine``` from pyspark import SparkContext from pyspark.streaming import StreamingContext

min_occurs = 4

def signals_from_1_row_to_many(row):   “output is (machine, date, signal_number, signal_value)”   result = []   for f in range(2,21):     result = result + [(row[0], row[1], f-1, row[f])]   return result

def isAlert(signal, value):   defaults = [83.0, 57.0, 37.0, 57.0, 45.0, 19.0, -223.0, 20.50, 20.42, 20.48, 20.24, 20.22, 20.43, 20, 20.44, 20.39, 20.36, 20.25, 1675.0]   soglia = 0.95   if value == ‘’:      return True   value = float(value)   ref = defaults[signal -1]   if value < ref - soglia*ref or value > ref + soglia*ref:     return True   else:     return False   def isException(machine, signal):

sample data. the sensor 19 of machine 11 is broken

  exceptions = [(11,19)]   return (int(machine), signal) in exceptions

Create a local StreamingContext with two working thread and batch interval of 10 second

sc = SparkContext(“local[2]”, “SignalsAlerts”) ssc = StreamingContext(sc, 10)

Create a DStream that will connect to hostname:port, like localhost:9999

lines = ssc.socketTextStream(“localhost”, 9999)

all_alerts = lines.map(lambda l: l.split(",")) \                  .flatMap(signals_from_1_row_to_many) \                  .filter(lambda s: isAlert(s[2], s[3])) \                  .filter(lambda s: not isException(s[0], s[2])) \                  .map(lambda s: (s[0]+’-’+str(s[2]), [(s[1], s[3])])) \                  .reduceByKey(lambda x, y: x + y)

alerts = all_alerts.filter(lambda s: len(s[1]) > min_occurs)

alerts.pprint()

ssc.start()             # Start the computation ssc.awaitTermination()  # Wait for the computation to terminate


Enter your instance's address


More posts like this