Si vuole costruire un sistema di monitoraggio real-time (o near-real-time) del segnale delle stazioni radio nelle provincie siciliane. Si vuole visualizzare graficamente l'andamento del segnale ed, eventualmente, effettuare una analisi di regressione atta a predire il comportamento futuro del segnale.
In ogni provincia siciliana verrà piazzato un microcontrollore, prototipato attraverso una board Arduino MEGA 2560 a cui viene aggiunto un modulo SparkFun FM Tuner Evaluation Board Si4703.
Il modulo cattura i pacchetti RDS (Radio Data System) inviati dalle stazioni radio. Il pacchetto contiene informazioni interessanti:
Il modulo cambia periodicamente frequenza e tenta di raccogliere pacchetti RDS. In caso di successo, scrive in un file di log i seguenti dati:
Per poter leggere i dati da Arduino è stato realizzato uno script Python, il quale sfrutta la porta seriale per ottenere i dati.
import serial
from datetime import datetime
ser = serial.Serial('/dev/ttyACM0', 115200, timeout = 1000)
ser.flushInput()
current_date = datetime.now().strftime('%d-%m-%Y')
filename = f'../logs/arduino-{current_date}.log'
f = open(filename, "w")
f.close()
while True:
ser_bytes = ser.readline().decode("utf-8")[:-2] + "\n"
with open(filename,"a") as f:
f.write(ser_bytes)
È stato, inoltre, realizzato un ulteriore script Python in grado di simulare il funzionamento di un microcontrollore.
def microcontroller(province):
logpath = generate_path_by_province(province)
while(True):
with open(logpath, 'a') as log:
data = f'province={province} '
data += f'coords={province_coords.get(province)} '
data += f'FM={generate_random_frequence_by_province(province)} '
data += f'RSSI={random.randint(-120,0)} \n'
log.write(data)
time.sleep(.5)
Logstash è un progetto open-source per l'analisi dei log in tempo reale. Utilizziamo una instanza di Logstash per ogni microcontrollore, che si occupi di trasportare i dati in un canale centralizzato (data ingestion). Utilizzare un canale differente per ogni microcontrollore permette di aumentare la fault tolerance del sistema: nel caso in cui una istanza di logstash non funzioni più, le altre istanze non ne risentirebbero e continuerebbero a lavorare. Logstash trasporterà i dati su Kafka.
Secondo RedHat: "Apache Kafka è una piattaforma per il data streaming distribuita che permette di pubblicare, sottoscrivere, archiviare ed elaborare flussi di record in tempo reale. È progettata per gestire flussi di dati provenienti da più fonti distribuendoli a più consumatori. In breve, consente di spostare grandi quantità di dati da un punto qualsiasi a un altro nello stesso momento."
Kafka Stream è una libreria client per sviluppare applicazioni e microservizi, dove i dati di input e di output sono conservati in un cluster Kafka. Tali applicazioni gestiscono dati in tempo reale.
Utilizzando Kafka Stream, abbiamo sviluppato un'applicazione Java che, consultando il PI o la frequenza del segnale inviato, arricchisce il dato aggiungendo il nome della stazione. Quest'ultimo si ottiene da un lookup su delle tabelle con corrispondenze Frequenza-Stazione o PI-Stazione, estratte attraverso un processo di scraping su siti autorevoli. Il risultato viene insierito su un ulteriore topic, chiamato rds-signal-output
.
Due consumatori consumano lo streaming dallo stesso topic (rds-signal-output), ma con due group-id differenti. Essi sono:
Il primo consumatore è un semplice script in python che instanzia un Kafka consumer e redirige i dati su Elasticsearch.
kconsumer = KafkaConsumer('rds-signal-output',
client_id='kafka-to-es-consumer',
group_id ='kafka-to-es',
bootstrap_servers=['kafkaserver:9092'],
value_deserializer=json_deserializer)
elasticsearch = Elasticsearch([{'host':'elasticsearch', 'port': 9200}])
create_es_index_mapping(elasticsearch)
for message in kconsumer:
print('sending message to es.')
message = correct_message_format(message.value)
ingest_msg_to_elasticsearch(message, elasticsearch)
Elasticsearch è un motore di ricerca e analisi distribuito (open source) per tutti i tipi di dati, inclusi testuali, numerici, geospaziali, strutturati e non strutturati. Conserviamo i nostri dati all'interno di Elasticsearch poiché, in combinazione con Kibana, ci permette di eseguire le analisi proposte nei requisiti del progetto.
Il secondo consumatore è Spark, ovvero una piattaforma open source per l’elaborazione di analisi dei dati su larga scala, progettata per essere veloce e generica. Nello specifico, utilizziamo Spark Structured Streaming, che ci permette di lavorare su dati in tempo reale raggruppati all'interno di un'astrazione ad alto livello, il dataframe. Un dataframe è come una tabella contenente i dati e su cui possono essere svolte delle operazioni.
Attraverso Spark structured streaming otteniamo un dataframe (in streaming) contenente i dati trasmessi in tempo reale da Kafka.
def get_rds_signal_stream(schema: StructType):
return spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'kafkaserver:9092') \
.option('subscribe', 'rds-signal-output') \
.option("kafka.group.id", "spark-consumer") \
.load() \
.select('timestamp', 'value') \
.withColumn('time', to_timestamp('timestamp', 'YYYY/MM/DD hh:mm:ss')) \
.withColumn('json_content', col('value').cast('string')) \
.withColumn('content', from_json(col('json_content'), schema)) \
.select(col('time'), col('content.*')) \
.withColumn('milliseconds', unix_timestamp('@timestamp', format="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) \
.select(
col('time'),
col('station_name'),
col('RSSI').cast('long'),
col('@timestamp'),
col('province'),
col('FM'),
col('coords'),
col('milliseconds')
)
Raggruppiamo tali dati per provincia, per stazione e per una finestra temporale di 1 minuto. Ciò implica che il dataframe è diviso in tanti gruppi, dove ogni gruppo contiene dati in una determinata finestra temporale, appartenenti ad una sola provincia ed emessi da una sola stazione radio.
win = window(signalStream.time, "1 minutes")
signalStream \
.groupBy('province', 'station_name', win) \
.applyInPandas(predict, get_resulting_df_schema()) \
.writeStream \
.option('checkpointLocation', '/save/location') \
.format('es') \
.start('rds-signal-output-spark') \
.awaitTermination()
Ad ogni gruppo applichiamo una funzione che esegue una analisi di regressione lineare, attraverso scikit-learn, in cui la variabile indipendente è il timestamp del segnale, mentre la variabile dipendente è l'RSSI. Una volta trovata la migliore retta di approssimazione, si prevede l'RSSI per i 5 minuti successivi. L'output di Spark sarà proiettato nel futuro e cercherà di predire la forza del segnale emesso dalle stazioni. I dati predetti sono depositati su elasticsearch in un indice chiamato "rds-signal-output-spark".
Kibana è l'anello mancante...
È un’interfaccia web estensibile per la presentazione visiva dei dati raccolti. Insieme ad Elasticsearch e allo strumento di elaborazione Logstash forma il cosiddetto stack ELK.
Attraverso Kibana aggreghiamo i dati dei microcontrollori dall'indice "rds-signal-output" con i dati predetti da Spark in "rds-signal-output-spark" in un unico pattern "rds-signal-output*".
Creiamo delle dashboard che ci permettono di visualizzare l'andamento dei dati, la predizione e altre preziose informazioni, il tutto in tempo reale. Vediamo qualche grafico d'esempio!
Coded by Lemuel Puglisi e Luigi Seminara. |