fmap (Frequence Mapper)



Introduzione¶

Si vuole costruire un sistema di monitoraggio real-time (o near-real-time) del segnale delle stazioni radio nelle provincie siciliane. Si vuole visualizzare graficamente l'andamento del segnale ed, eventualmente, effettuare una analisi di regressione atta a predire il comportamento futuro del segnale.

Data flow¶

Data sources¶

In ogni provincia siciliana verrà piazzato un microcontrollore, prototipato attraverso una board Arduino MEGA 2560 a cui viene aggiunto un modulo SparkFun FM Tuner Evaluation Board Si4703.

Il modulo cattura i pacchetti RDS (Radio Data System) inviati dalle stazioni radio. Il pacchetto contiene informazioni interessanti:

  • PI (Programme Identifier): identifica univocamente una stazione radio con 4 caratteri esadecimali;
  • RSSI (Received Signal Strength Indication) un numero che va da -120 a 0 ed indica la qualità del segnale (che migliora con l'avvicinarsi allo 0).

Il modulo cambia periodicamente frequenza e tenta di raccogliere pacchetti RDS. In caso di successo, scrive in un file di log i seguenti dati:

  • RSSI;
  • frequenza;
  • PI (anche se quest'ultimo può mancare nel caso di un segnale particolarmente debole).

Per poter leggere i dati da Arduino è stato realizzato uno script Python, il quale sfrutta la porta seriale per ottenere i dati.

arduino/serial_reading.py¶

import serial
from datetime import datetime


ser = serial.Serial('/dev/ttyACM0', 115200, timeout = 1000)
ser.flushInput()

current_date = datetime.now().strftime('%d-%m-%Y')
filename = f'../logs/arduino-{current_date}.log'
f = open(filename, "w")
f.close() 

while True:
    ser_bytes = ser.readline().decode("utf-8")[:-2] + "\n"
    with open(filename,"a") as f:
        f.write(ser_bytes)

È stato, inoltre, realizzato un ulteriore script Python in grado di simulare il funzionamento di un microcontrollore.

simulator/app.py¶

def microcontroller(province): 
    logpath = generate_path_by_province(province)
    while(True):
        with open(logpath, 'a') as log:
            data  = f'province={province} '
            data += f'coords={province_coords.get(province)} '
            data += f'FM={generate_random_frequence_by_province(province)} '
            data += f'RSSI={random.randint(-120,0)} \n'            
            log.write(data)
            time.sleep(.5)

Data Ingestion¶

Logstash è un progetto open-source per l'analisi dei log in tempo reale. Utilizziamo una instanza di Logstash per ogni microcontrollore, che si occupi di trasportare i dati in un canale centralizzato (data ingestion). Utilizzare un canale differente per ogni microcontrollore permette di aumentare la fault tolerance del sistema: nel caso in cui una istanza di logstash non funzioni più, le altre istanze non ne risentirebbero e continuerebbero a lavorare. Logstash trasporterà i dati su Kafka.

Data streaming e Data enrichment¶

Cosa è Kafka?¶

Secondo RedHat: "Apache Kafka è una piattaforma per il data streaming distribuita che permette di pubblicare, sottoscrivere, archiviare ed elaborare flussi di record in tempo reale. È progettata per gestire flussi di dati provenienti da più fonti distribuendoli a più consumatori. In breve, consente di spostare grandi quantità di dati da un punto qualsiasi a un altro nello stesso momento."

Cosa è Kafka Stream?¶

Kafka Stream è una libreria client per sviluppare applicazioni e microservizi, dove i dati di input e di output sono conservati in un cluster Kafka. Tali applicazioni gestiscono dati in tempo reale.

Cosa è stato fatto?¶

Utilizzando Kafka Stream, abbiamo sviluppato un'applicazione Java che, consultando il PI o la frequenza del segnale inviato, arricchisce il dato aggiungendo il nome della stazione. Quest'ultimo si ottiene da un lookup su delle tabelle con corrispondenze Frequenza-Stazione o PI-Stazione, estratte attraverso un processo di scraping su siti autorevoli. Il risultato viene insierito su un ulteriore topic, chiamato rds-signal-output.

Consumers¶

Due consumatori consumano lo streaming dallo stesso topic (rds-signal-output), ma con due group-id differenti. Essi sono:

  • Python app (trasporto dei dati su ES);
  • Spark Structured Streaming.

Data indexing¶

Il primo consumatore è un semplice script in python che instanzia un Kafka consumer e redirige i dati su Elasticsearch.

kafka-to-es/connect.py¶

kconsumer = KafkaConsumer('rds-signal-output',
    client_id='kafka-to-es-consumer', 
    group_id ='kafka-to-es', 
    bootstrap_servers=['kafkaserver:9092'],
    value_deserializer=json_deserializer)

elasticsearch = Elasticsearch([{'host':'elasticsearch', 'port': 9200}])
create_es_index_mapping(elasticsearch)

for message in kconsumer:
    print('sending message to es.')
    message = correct_message_format(message.value)
    ingest_msg_to_elasticsearch(message, elasticsearch)

Ma... Cosa è Elasticsearch?¶

Elasticsearch è un motore di ricerca e analisi distribuito (open source) per tutti i tipi di dati, inclusi testuali, numerici, geospaziali, strutturati e non strutturati. Conserviamo i nostri dati all'interno di Elasticsearch poiché, in combinazione con Kibana, ci permette di eseguire le analisi proposte nei requisiti del progetto.

Data processing e machine learning¶

Il secondo consumatore è Spark, ovvero una piattaforma open source per l’elaborazione di analisi dei dati su larga scala, progettata per essere veloce e generica. Nello specifico, utilizziamo Spark Structured Streaming, che ci permette di lavorare su dati in tempo reale raggruppati all'interno di un'astrazione ad alto livello, il dataframe. Un dataframe è come una tabella contenente i dati e su cui possono essere svolte delle operazioni.

Attraverso Spark structured streaming otteniamo un dataframe (in streaming) contenente i dati trasmessi in tempo reale da Kafka.

def get_rds_signal_stream(schema: StructType):
    return spark.readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'kafkaserver:9092') \
        .option('subscribe', 'rds-signal-output') \
        .option("kafka.group.id", "spark-consumer") \
        .load() \
        .select('timestamp', 'value') \
        .withColumn('time', to_timestamp('timestamp', 'YYYY/MM/DD hh:mm:ss')) \
        .withColumn('json_content', col('value').cast('string')) \
        .withColumn('content', from_json(col('json_content'), schema)) \
        .select(col('time'), col('content.*')) \
        .withColumn('milliseconds', unix_timestamp('@timestamp', format="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) \
        .select(
            col('time'), 
            col('station_name'), 
            col('RSSI').cast('long'), 
            col('@timestamp'), 
            col('province'), 
            col('FM'), 
            col('coords'),
            col('milliseconds')
        )

Raggruppiamo tali dati per provincia, per stazione e per una finestra temporale di 1 minuto. Ciò implica che il dataframe è diviso in tanti gruppi, dove ogni gruppo contiene dati in una determinata finestra temporale, appartenenti ad una sola provincia ed emessi da una sola stazione radio.

win = window(signalStream.time, "1 minutes")
signalStream \
    .groupBy('province', 'station_name', win) \
    .applyInPandas(predict, get_resulting_df_schema()) \
    .writeStream \
    .option('checkpointLocation', '/save/location') \
    .format('es') \
    .start('rds-signal-output-spark') \
    .awaitTermination()

Ad ogni gruppo applichiamo una funzione che esegue una analisi di regressione lineare, attraverso scikit-learn, in cui la variabile indipendente è il timestamp del segnale, mentre la variabile dipendente è l'RSSI. Una volta trovata la migliore retta di approssimazione, si prevede l'RSSI per i 5 minuti successivi. L'output di Spark sarà proiettato nel futuro e cercherà di predire la forza del segnale emesso dalle stazioni. I dati predetti sono depositati su elasticsearch in un indice chiamato "rds-signal-output-spark".

Data Visualization¶

Kibana è l'anello mancante...

È un’interfaccia web estensibile per la presentazione visiva dei dati raccolti. Insieme ad Elasticsearch e allo strumento di elaborazione Logstash forma il cosiddetto stack ELK.

Attraverso Kibana aggreghiamo i dati dei microcontrollori dall'indice "rds-signal-output" con i dati predetti da Spark in "rds-signal-output-spark" in un unico pattern "rds-signal-output*".

Creiamo delle dashboard che ci permettono di visualizzare l'andamento dei dati, la predizione e altre preziose informazioni, il tutto in tempo reale. Vediamo qualche grafico d'esempio!



Coded by Lemuel Puglisi e Luigi Seminara.