Friday, December 30, 2022
HomeData ScienceA Actual-Time Streaming Challenge with Smartphone Information | by Harrison Hoffman |...

A Actual-Time Streaming Challenge with Smartphone Information | by Harrison Hoffman | Dec, 2022


Units are all over the place. Smartphones, fridges, doorbells, watches, medical sensors, safety methods, and health trackers, to call a couple of, are actually commonplace and continuously recording (doubtlessly high-frequency) data. These units type a community often called the “Web of Issues”, or IoT, and supply wealthy knowledge sources.

I not too long ago grew to become inquisitive about how this knowledge is ingested, processed, and saved. Whereas assets on this matter are plentiful, few give examples with real-life knowledge accessible to anybody. As I searched from article to article to find out about event-driven methods and streaming applied sciences like Apache Kafka, I got here throughout a smartphone app, Sensor Logger, that permits customers to stream knowledge from motion-related sensors on their telephones. Since I’ve a smartphone, this appeared like the proper solution to study, and the “smartphone_sensor_stream” challenge was born. On this challenge, we make the most of FastAPI, Kafka, QuestDB, and Docker to visualise real-time sensor knowledge on a dashboard.

On this article, we are going to go over the entire foremost elements of this challenge at a excessive stage. All the pieces wanted to run the challenge domestically is offered on GitHub, and a fast demo is offered on YouTube. As a disclaimer, I’m very a lot a newbie in event-driven methods and would enormously admire suggestions to additional my information. Get pleasure from!

Let’s begin by wanting on the structure for this challenge (i.e. how knowledge will movement from smartphones to a dashboard):

Challenge Structure. Picture by Writer (made with Lucidchart).

Every smartphone sends sensor readings (accelerometer, gyroscope, and magnetometer) through a POST request to a FastAPI software. FastAPI, the producer, asynchronously writes the sensor readings to a Kafka matter as JSON (the information from the request physique). Every JSON object is ingested by a python course of, the patron, and saved in a QuestDB desk. As soon as knowledge is within the database, it’s accessible by any downstream service or software that is dependent upon it. For half 1 of this challenge, we are going to plot sensor readings on a dashboard utilizing server-sent occasions (SSE).

This challenge is a set of small companies that interface with each other to get knowledge from smartphones to the dashboard. Right here’s the listing construction:

|-producer
| |-app
| | |-core
| | | |-config.py
| | |-__init__.py
| | |-schemas
| | | |-sensors.py
| | |-main.py
| |-requirements.txt
| |-Dockerfile
| |-entrypoint.sh
|-db_consumer
| |-app
| | |-core
| | | |-config.py
| | |-models
| | | |-sensors.py
| | |-db
| | | |-ingress.py
| | |-main.py
| |-requirements.txt
| |-Dockerfile
| |-entrypoint.sh
|-ui_server
| |-app
| | |-core
| | | |-config.py
| | |-models
| | | |-sensors.py
| | |-static
| | | |-js
| | | | |-main.js
| | |-db
| | | |-data_api.py
| | |-templates
| | | |-index.html
| | |-main.py
| |-requirements.txt
| |-Dockerfile
| |-entrypoint.sh
|-README.md
|-.gitignore
|-.env
|-docker-compose.yml

We are going to write three companies: the producer, client, and UI. Every service is packaged with a Dockerfile and orchestrated through docker-compose. Docker-compose permits us to run the companies we write, with exterior companies (Kafka, Zookeeper, and QuestDB), as particular person containers linked via an inside community. All the pieces we have to orchestrate the companies on this challenge is in a docker-compose file:

Docker-Compose File for the Challenge.

Discover the 4 companies we don’t write ourselves (fortunately): Zookeeper, Kafka, QuestDB, and Kafka-UI. These companies work along with the producer, client, and UI to create the challenge. We are going to stroll via every service individually, however first, we have to perceive the information supply.

Sensor Logger is an iOS and Android app that permits customers to log motion-related sensor readings from their smartphones. Customers can view real-time sensor readings, export knowledge as information, and push reside knowledge to a server through HTTP. This challenge leverages the HTTP performance to extract sensor readings. To configure Sensor Logger, begin by ensuring the entire following sensors are chosen:

Choose Sensors. Picture by Writer.

We are going to ingest readings from the telephone’s accelerometer, gyroscope, and magnetometer. Subsequent, we have to configure Sensor Logger’s settings in order that it is aware of the place to push knowledge:

Sensor Logger Settings. Picture by Writer.

Essentially the most essential element is to make sure the “Push URL” is right — that is the endpoint of the FastAPI producer that accepts uncooked sensor readings through POST requests. We’ll use our pc as a server, so we have to establish the corresponding IP handle. On a Mac, that is underneath System Preferences -> Community:

Find the IP Deal with of a Mac. Picture by Writer.

Observe that the IP handle of a pc is often distinctive to a WI-FI community, which means a brand new IP handle is allotted each time a pc connects to a brand new community. Due to this fact, it’s essential that the smartphone and the host pc are on the identical community. The FastAPI producer accepts sensor readings at:

http://{your_ip_address}:8000/phone-producer

Paste the above URL into the “Push URL” field, and Sensor Logger must be able to go!

This text gained’t go into a lot element about Kafka since there any many out there assets on the platform. Nonetheless, as a abstract, Kafka is a extremely performant framework for storing and studying streaming knowledge. Kafka’s basic knowledge construction is the log. Purposes that write messages to the log are referred to as producers. In contrast to a queue, messages within the log persist even after being learn — this permits a number of purposes (often called customers) to learn concurrently from totally different positions. For simplicity, this challenge solely has one producer (the FastAPI software that writes uncooked sensor readings to Kafka) and one client (a python course of that reads messages from Kafka and codecs them in a database). Zookeeper is a service that helps handle the varied elements of Kafka.

Solely two docker pictures are wanted to get Kafka and Zookeeper working domestically:

Docker-Compose Part for Kafka and Zookeeper.

We’ll use the Bitmani distribution of Kafka and Zookeeper. The Kafka-UI picture permits customers to work together with Kafka clusters via an internet software however shouldn’t be required for this challenge. Save the above docker-compose file as docker-compose.yml, run docker-compose up, and a GUI just like the next must be out there at http://localhost:18080/ :

Kafka UI. Picture by Writer.

Details about brokers, subjects, and customers can be added to this dashboard as elements are added to the system.

To this point, we have now Sensor Logger configured to publish uncooked sensor readings to a server and a Kafka occasion able to obtain these readings. The subsequent step is to create a bridge between the uncooked knowledge and Kafka— the producer. The producer on this challenge is a FastAPI software that accepts knowledge despatched from smartphones and writes it to the Kafka log. Right here’s the structure of the producer:

|-producer
| |-app
| | |-core
| | | |-config.py
| | |-__init__.py
| | |-schemas
| | | |-sensors.py
| | |-main.py
| |-requirements.txt
| |-Dockerfile
| |-entrypoint.sh

We gained’t undergo each file within the producer listing since every part is offered on GitHub. As an alternative, let’s check out foremost.py (the driving script of the producer API):

foremost.py File for the Producer

Line 9 instantiates a FastAPI object. Traces 11–17 create an occasion of a Kafka producer object with Aiokafka. Aiokafka permits us to put in writing messages to Kafka asynchronously, which means we don’t have to attend for Kafka to obtain and course of the message (in line 45) earlier than we transfer onto the following line of code. As an alternative, Aiokafka sends the present message to Kafka and is nearly immediately prepared to provide one other message. Traces 27–55 outline the route that may obtain uncooked sensor readings. To grasp this higher, let’s check out the request physique format that this route expects (the knowledge argument):

{"messageId": 20,
"sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976",
"deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b",
"payload": [{"name": "accelerometeruncalibrated",
"time": "1671406719721160400",
"values": {"z": -0.9372100830078125,
"y": -0.3241424560546875,
"x": 0.0323486328125}},
{"name": "magnetometeruncalibrated",
"time": "1671406719726579500",
"values": {"z": -5061.64599609375,
"y": 591.083251953125,
"x": 3500.541015625}},
{"name": "gyroscopeuncalibrated",
"time": "1671406719726173400",
"values": {"z": -0.004710599314421415,
"y": -0.013125921599566936,
"x": 0.009486978873610497}},
...
]}

Every request physique is a JSON object with entries “messageId”, “sessionId”, “deviceId”, and “payload”. Smartphones are uniquely recognized by their “deviceId”. Each time a telephone begins a brand new stream, a brand new “sessionId” is created for it. The “messageId” entry signifies the order a message falls within the sequence from the present session. The “payload” entry is an array of JSON objects that comprise readings for every sensor configured in Sensor Logger. Each “payload” entry has the sensor’s title, the time the studying was recorded (in unix time), and the studying itself. We’re working completely with triaxial sensors, so each sensor ought to have “x”, “y”, and “z” readings corresponding with the three spatial dimensions.

The FastAPI route writes a uncooked request physique on to the Kafka matter (in line 45), and metadata is logged and returned in strains 47–55. This route is uncovered at http://{your_ip_address}:8000/phone-producer, as mentioned within the Sensor Logger part. All requests are validated by the Pydantic SensorReading object (i.e. any request that doesn’t match Sensor Logger’s format won’t be processed by the route):

sensors.py File for the Producer.

Configuration for the producer is dealt with via setting variables which can be learn in by a Pydantic BaseSettings object:

config.py File for the Producer.

Surroundings variables are saved in a .env file:

# Kafka config
PROJECT_NAME=phone_stream_producer
TOPIC_NAME=raw-phone-stream
KAFKA_HOST=kafka
KAFKA_PORT=9092

and handed to the producer within the docker-compose file (line 9 beneath):

Docker-Compose Part for the Producer.

Discover the host argument within the start-up command is 0.0.0.0. That is what permits the producer to be accessed by its IP handle from any machine on the native community.

We now have the infrastructure to stream sensor knowledge from smartphones to the FastAPI producer and Kafka. The subsequent step is to create a course of (the patron) that reads from Kafka and does one thing with the information. Shoppers could be liable for something associated to studying and manipulating knowledge saved within the log. The buyer for this challenge can be used to remodel uncooked sensor readings and retailer them in a time sequence database often called QuestDB. Right here’s the listing construction for the patron:

|-db_consumer
| |-app
| | |-core
| | | |-config.py
| | |-models
| | | |-sensors.py
| | |-db
| | | |-ingress.py
| | |-main.py
| |-requirements.txt
| |-Dockerfile
| |-entrypoint.sh

Earlier than creating the patron, we have to rise up a QuestDB occasion. QuestDB is a extremely performant open-source time sequence database with a Postgres-compatible API. This implies we are able to question QuestDB tables as in the event that they had been row-oriented Postgres tables whereas reaping the advantages of column-oriented tables. We will run QuestDB utilizing docker:

Docker-Compose Part for QuestDB.

Discover in strains 5–8 that we’re exposing ports 9000, 9009, and 9003. These ports, particularly port 9000, are used to put in writing knowledge to QuestDB tables. By together with these ports within the expose part, and never within the ports part, we be sure that solely containers working in the identical Docker community can write knowledge. Port 8812 is accessible exterior the Docker community and is used to question knowledge. The setting variables QDB_PG_USER and QDB_PG_PASSWORD , together with different QuestDB-related variables, are set within the .env file:

# Questdb config
DB_USER=admin
DB_PASSWORD=quest
DB_HOST=questdb
DB_PORT=8812
DB_IMP_PORT=9000
DB_NAME=qdb
DB_TRIAXIAL_OFFLOAD_TABLE_NAME=device_offload

The driving code of the patron is in foremost.py :

foremost.py file for the Client.

There’s loads to unpack right here, however the core logic occurs in strains 35–39. The buyer asynchronously loops via messages within the specified Kafka matter. This loop will repeatedly devour messages so long as the subject is up to date. Messages are formatted and written to a QuestDB desk utilizing the next perform:

Operate to Write a Sensor Payload to a Desk.

Your entire payload is formatted and saved as a CSV file in reminiscence utilizing StringIO. From there, the CSV is shipped through a POST request to the QuestDB write port. This facilitates rapidly writing a whole payload to QuestDB utilizing a single connection and request.

The desk that shops sensor knowledge is designed to stability fast writes with fast reads. Right here’s the question to create the desk in QuestDB:

CREATE TABLE IF NOT EXISTS device_offload (
device_id TEXT,
session_id TEXT,
device_timestamp TEXT,
recorded_timestamp TEXT,
sensor_name TEXT,
x REAL,
y REAL,
z REAL
)

The device_id and session_id fields come straight from the primary two entries of the uncooked payload, as mentioned beforehand. device_timestamp is the time that a person pattern of sensor knowledge was collected on the machine, whereas recorded_timestamp is the time the pattern hit the database. Due to this, we are able to measure how lengthy it takes for a pattern of knowledge to get from the machine to the database. Since we’re solely working with triaxial sensors, we are able to retailer their values within the x, y , and z fields, and specify which sensor every pattern belongs to within the sensor_name discipline. This schema permits us to put in writing knowledge from each sensor in a payload to the identical desk in a single write (versus writing to a number of tables requiring a number of writes).

It’s vital to notice that, in a real-world setting, this QuestDB desk would probably not be the ultimate storage vacation spot of the information. As an alternative, this desk would act as a buffer, enabling purposes to readily entry knowledge in a structured format. Excessive frequency sensor knowledge (50 hz in our case) rapidly grows and turns into tough to keep up. We’d probably introduce one other Kafka pipeline liable for transferring previous knowledge out of QuestDB and into an archive.

The final step for this client is so as to add the corresponding docker-compose instructions:

Docker-Compose Part for the Client.

We have now every part in place to visualise the sensor knowledge because it’s written to QuestDB. To do that, we have to rise up one other FastAPI software that polls the database and makes use of server-sent occasions (SSE) to replace an HTML web page. Right here’s the final listing construction to look at:

|-ui_server
| |-app
| | |-core
| | | |-config.py
| | |-models
| | | |-sensors.py
| | |-static
| | | |-js
| | | | |-main.js
| | |-db
| | | |-data_api.py
| | |-templates
| | | |-index.html
| | |-main.py
| |-requirements.txt
| |-Dockerfile
| |-entrypoint.sh

As earlier than, foremost.py is the motive force for this app:

SSE Route for the Dashboard.

Each 0.1 seconds (line 90), the message_stream perform will question the database for the newest second of sensor readings (line 62). On this iteration of the dashboard, solely accelerometer knowledge is queried and displayed. The max_lookback_seconds argument is about to 60 — which means that any telephones that haven’t despatched knowledge within the final 60 seconds can be filtered out within the question. Therefore, this dashboard will show the newest second of accelerometer knowledge for all telephones which have despatched knowledge on the final minute. Right here’s the question logic:

Question to Get Information for the Dashboard.

Add the required strains to the docker-compose file:

Docker-Compose Part for the Dashboard.

And the dashboard must be out there at http://localhost:5000:

Dashboard Displaying Reside Sensor Information. Picture by Writer.

This text gave a high-level overview of a real-time streaming challenge with a knowledge supply that most individuals have entry to (smartphones). Whereas there are numerous transferring elements right here, we merely peeked into the world of knowledge streaming. Maybe in a future article we are able to enhance upon this challenge and add extra customers. As at all times, suggestions is enormously appreciated. Thanks for studying!

Like my articles? Purchase me a espresso: https://www.buymeacoffee.com/HarrisonfhU

References

Apache Kafka: https://kafka.apache.org/

Occasion-Pushed Architectires — The Quere vs The Log: https://jack-vanlightly.com/weblog/2018/5/20/event-driven-architectures-the-queue-vs-the-log

Lucidchart: https://www.lucidchart.com/

Kafka Poc utilizing FastApi: https://github.com/GavriloviciEduard/fastapi-kafka

geo-stream-kafka: https://github.com/iwpnd/geo-stream-kafka

18 Most Well-liked IoT Units in 2022: https://www.softwaretestinghelp.com/iot-devices/#:~:textual content=Sensiblepercent20Mobilespercent2Cpercent20smartpercent20refrigeratorspercent2Cpercent20smartwatches,therepercent20bypercent20thepercent20yearpercent202022percent3F

FastAPI: https://fastapi.tiangolo.com/

QuestDB: https://questdb.io/docs/

Row vs Column Oriented Databases: https://dataschool.com/data-modeling-101/row-vs-column-oriented-databases/

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

- Advertisment -
Google search engine

Most Popular

Recent Comments