Table of Contents
Introduction
MQTT brokers connect IoT devices and applications through a publish-subscribe messaging pattern, making them essential for modern IoT infrastructure. Coreflux is a low-code MQTT broker that adds real-time data processing and transformation capabilities, allowing you to integrate directly with managed databases including MongoDB, PostgreSQL, MySQL, and OpenSearch without writing custom integration code.
What you'll learn: This tutorial walks you through deploying a complete IoT data pipeline—from setting up a managed database cluster and Coreflux MQTT broker on the cloud provider, to configuring secure VPC networking, building data transformation models using Coreflux's Language of Things (LoT), and automatically storing processed IoT data in your chosen database. You'll end up with a production-ready setup that handles real-time messaging and persistent storage for IoT applications.
Key Takeaways
Before diving into the step-by-step deployment process, here are the key points you'll learn:
- Deploy a managed database cluster (PostgreSQL, MongoDB, MySQL, or OpenSearch) on the cloud provider for scalable IoT data storage.
- Set up Coreflux MQTT broker on a cloud servers using the Marketplace image or Docker.
- Create secure VPC networking to connect your MQTT broker and database without public exposure.
- Build real-time data pipelines using Coreflux's Language of Things (LoT) for low-code IoT automation.
- Transform and store IoT data automatically from MQTT topics to database tables, collections, or indexes.
- Verify end-to-end data flow from simulated sensors through transformation models to database storage.
This tutorial provides a production-ready foundation for IoT applications that need real-time messaging combined with persistent data storage and advanced capabilities like search, analytics, or relational queries.
What You Will Build
By the end of this automation guide, you will have deployed:
- A managed database cluster (PostgreSQL, MongoDB, MySQL, or OpenSearch) for scalable storage
- A cloud servers running Coreflux MQTT broker
- A Virtual Private Cloud (VPC) network for secure IoT communication
- Real-time data simulation using LoT Notebook extension
- Low code data transformation models and database integration routes
- Complete Data Integration & Transformation pipeline for IoT automation
Coreflux & the cloud provider Partnership
Coreflux provides a Lightweight MQTT Broker and Data Pipeline tools through Language-of-Things programming language for Efficient IoT Communication on the cloud provider.
What is MQTT?
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe network protocol widely adopted in IoT ecosystems. Designed for constrained devices and low-bandwidth, high-latency, or unreliable networks, MQTT enables efficient, real-time messaging in bandwidth-constrained environments.
About Coreflux
Coreflux offers a lightweight MQTT broker to facilitate efficient, real-time communication between IoT devices and applications, including real-time data transformation capabilities necessary for each use-case. Built for scalability and reliability, Coreflux is tailored for environments where low latency and high throughput are critical.
Coreflux handles message routing and data flow between devices, whether you're building a small IoT project or deploying an industrial monitoring system.
With Coreflux on the cloud provider, you get:
Data Processing: Centralization of your data processing needs where your data lives, ensuring real-time data processing.
Data Integration: Easily integrate with other the cloud provider services like Managed Databases (PostgreSQL, MongoDB, MySQL, or OpenSearch), ensuring a single and simple ecosystem for all your data needs.
Scalability: Easily handle growing amounts of data and devices without compromising performance.
Reliability: Ensure consistent and dependable messaging across all connected devices.
Prerequisites
Before you begin this MQTT broker deployment tutorial, you'll need:
- A the cloud provider account with billing enabled
- Understanding of MQTT protocol concepts and IoT architecture (see our introduction to MQTT for background)
- Visual Studio Code (for LoT Notebook extension)
Estimated time: This tutorial takes approximately 30-45 minutes to complete, depending on database provisioning time (typically 1-5 minutes per database cluster).
Step 1 — Creating the Network Infrastructure for IoT Automation
Creating a VPC Network for Secure MQTT Communication
First, you'll create a Virtual Private Cloud (VPC) to ensure secure communication between your IoT services and MQTT broker, without the need for public access.
- Log in to your the cloud provider control panel
- Navigate to Networking → VPC from the left sidebar
- Click Create VPC Network
- Configure your VPC for IoT automation:
- Name: coreflux-integrations-vpc (or your VPC name)
- Datacenter region: Choose Frankfurt (or your preferred region)
- IP Range: Use the default or configure as needed
- Description: Add a meaningful description for your MQTT broker and Databases network
- Click Create VPC Network
The VPC will provide isolated networking for all your IoT resources, ensuring secure communication between the Coreflux MQTT broker and managed databases. For more details on VPC configuration, see our guide on creating VPC networks.
Step 2 — Setting Up Managed Database for Scalable Storage
Choose one of the following database options based on your IoT application requirements:
- PostgreSQL: Ideal for structured data requiring relational queries, ACID compliance, and complex relationships
- MySQL: Great for structured workloads and transactional queries with strong consistency and widespread tooling support
- MongoDB: Perfect for flexible document storage with varying schemas and rapid development
- OpenSearch: Excellent for advanced search, analytics, log analysis, and time-series data visualization
Setting Up PostgreSQL Managed Database
Managed PostgreSQL on the cloud provider is a good fit when your IoT workloads need relational schemas, strong consistency, and advanced SQL analytics, backed by automated backups, monitoring, and maintenance.
- From the the cloud provider control panel, navigate to Databases
- Click Create Database Cluster
- Configure your PostgreSQL cluster for IoT automation:
- Database engine: Select PostgreSQL
- Version: Choose the latest stable version
- Datacenter region: Select Frankfurt (same as your VPC)
- VPC Network: Select the coreflux-integrations-vpc you created
- Database cluster name: postgresql-coreflux-test
- Project: Select your target project
- Choose your plan based on your IoT requirements:
- For development: Basic plan with 1 GB RAM
- For production: General Purpose or higher for scalable storage
- Click Create Database Cluster
The managed database creation process typically takes 1-5 minutes. Once complete, you'll be redirected to the database overview page, where you can see the connection details and perform administrative actions.
Configuring PostgreSQL Database Access for MQTT Broker Integration
You'll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).
- Click Get Started to configure your PostgreSQL database
- [Optional] Restrict inbound connections:
- Add your local computer's IP for management access
- The droplet will be automatically allowed through VPC networking
For connection details, you'll be able to see two options – Public Network and VPC Network. The first is for external access for tools like DBeaver, while the second will be used by the Coreflux service to access the database.
- Note the connection details provided, both for public access and for VPC access (distinct details for each):
- Host: Your database hostname
- User: Default admin user
- Password: Auto-generated secure password
- Database: Authentication database name
Testing PostgreSQL Database Connection
You can test the PostgreSQL connection using DBeaver with the provided connection parameters, using public access credentials:
Creating PostgreSQL Application Database and User (Optional)
For better security and organization, create a dedicated user and database for your IoT automation application. This can also be done through DBeaver or CLI, but the cloud provider provides a user-friendly approach:
- Go to Users & Databases tab in your managed database cluster
- Create User:
- Username: coreflux-broker-client
- Password: Autogenerated
- Create Database:
- Database name: coreflux-broker-data
Note: You may need to change the user permissions within the database to be able to create tables, insert and select data. For PostgreSQL, grant the necessary privileges using GRANT CREATE, INSERT, SELECT ON DATABASE coreflux-broker-data TO coreflux-broker-client;. For MySQL, use GRANT CREATE, INSERT, SELECT ON coreflux-broker-data.* TO 'coreflux-broker-client'@'%';. See our PostgreSQL quickstart and MySQL documentation for more details.
Setting Up MySQL Managed Database
Managed MySQL on the cloud provider is ideal for structured, transactional IoT data where you want familiar SQL, broad ecosystem support, and a fully managed service handling backups, updates, and monitoring.
- From the the cloud provider control panel, navigate to Databases
- Click Create Database Cluster
- Configure your MySQL cluster for IoT automation:
- Database engine: Select MySQL
- Version: Choose the latest stable version
- Datacenter region: Select Frankfurt (same as your VPC)
- VPC Network: Select the coreflux-integrations-vpc you created
- Database cluster name: mysql-coreflux-test
- Project: Select your target project
- Choose your plan based on your IoT requirements:
- For development: Basic plan with 1 GB RAM
- For production: General Purpose or higher for scalable storage
- Click Create Database Cluster
The managed database creation process typically takes 1-5 minutes. Once complete, you'll be redirected to the database overview page, where you can see the connection details and perform administrative actions.
Configuring MySQL Database Access for MQTT Broker Integration
You'll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).
- Click Get Started to configure your MySQL database
- [Optional] Restrict inbound connections:
- Add your local computer's IP for management access
- The droplet will be automatically allowed through VPC networking
For connection details, you'll be able to see two options – Public Network and VPC Network. The first is for external access for tools like DBeaver, while the second will be used by the Coreflux service to access the database.
- Note the connection details provided, both for public access and for VPC access (distinct details for each):
- Host: Your database hostname
- User: Default admin user
- Password: Auto-generated secure password
- Database: Authentication database name
Testing MySQL Database Connection
You can test the MySQL connection using DBeaver with the provided connection parameters, using public access credentials.
Note: You may need to change DBeaver’s driver settings — set allowPublicKeyRetrieval = true.
Creating MySQL Application Database and User (Optional)
For better security and organization, create a dedicated user and database for your IoT automation application. This can also be done through DBeaver or CLI, but the cloud provider provides a user-friendly approach:
- Go to Users & Databases tab in your managed database cluster
- Create User:
- Username: coreflux-broker-client
- Password: Autogenerated
- Create Database:
- Database name: coreflux-broker-data
Setting Up MongoDB Managed Database
Managed MongoDB on the cloud provider is well-suited to flexible or evolving IoT payloads, letting you store heterogeneous sensor documents without rigid schemas, while the platform handles replication, backups, and monitoring.
- From the the cloud provider control panel, navigate to Databases
- Click Create Database Cluster
- Configure your MongoDB cluster for IoT automation:
- Database engine: Select MongoDB
- Version: Choose the latest stable version
- Datacenter region: Select Frankfurt (same as your VPC)
- VPC Network: Select the coreflux-integrations-vpc you created
- Database cluster name: mongodb-coreflux-test
- Project: Select your target project
- Choose your plan based on your IoT requirements:
- For development: Basic plan with 1 GB RAM
- For production: General Purpose or higher for scalable storage
- Click Create Database Cluster
The managed database creation process typically takes 1-5 minutes. Once complete, you'll be redirected to the database overview page, where you can see the connection details and perform administrative actions.
Configuring MongoDB Database Access for MQTT Broker Integration
You'll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).
- Click Get Started to configure your MongoDB database
- [Optional] Restrict inbound connections:
- Add your local computer's IP for management access
- The droplet will be automatically allowed through VPC networking
For connection details, you'll be able to see two options: Public Network and VPC Network. The first is for external access for tools like MongoDB Compass, while the second will be used by the Coreflux service to access the database.
- Note the connection details provided, both for public access and for VPC access (distinct details for each):
- Host: Your database hostname
- User: Default admin user
- Password: Auto-generated secure password
- Database: Authentication database name
Testing MongoDB Database Connection
You can test the MongoDB connection using MongoDB Compass or the provided connection string, using public access credentials:
mongodb://username:password@mongodb-host:27017/defaultauthdb?ssl=true
Creating MongoDB Application Database and User (Optional)
For better security and organization, create a dedicated user and database for your IoT automation application. This can also be done through MongoDB Compass or CLI, but the cloud provider provides a user-friendly approach:
- Go to Users & Databases tab in your managed database cluster
- Create User:
- Username: coreflux-broker-client
- Password: Autogenerated
- Create Database:
- Database name: coreflux-broker-data
Setting Up OpenSearch Managed Database
Managed OpenSearch on the cloud provider is designed for search, log analytics, and time-series dashboards over high-volume IoT data, with the service managing cluster health, scaling, and index storage for you.
- From the the cloud provider control panel, navigate to Databases
- Click Create Database Cluster
- Configure your OpenSearch cluster for IoT automation:
- Database engine: Select OpenSearch
- Version: Choose the latest stable version
- Datacenter region: Select Frankfurt (same as your VPC)
- VPC Network: Select the coreflux-integrations-vpc you created
- Database cluster name: opensearch-coreflux-test
- Project: Select your target project
- Choose your plan based on your IoT requirements:
- For development: Basic plan with 1 GB RAM
- For production: General Purpose or higher for scalable storage
- Click Create Database Cluster
The managed database creation process typically takes 1-5 minutes. Once complete, you'll be redirected to the database overview page, where you can see the connection details and perform administrative actions.
Configuring OpenSearch Database Access for MQTT Broker Integration
You'll be prompted with Getting Started steps, where your connection details are shown and you can configure the inbound access rules (recommended to limit to your IP and VPC-only).
- Click Get Started to configure your OpenSearch database
- [Optional] Restrict inbound connections:
- Add your local computer's IP for management access
- The droplet will be automatically allowed through VPC networking
For connection details, you'll be able to see two options: Public Network and VPC Network. The first is for external access for tools, while the second will be used by the Coreflux service to access the database. You'll also see the URL and parameters to access the OpenSearch Dashboards.
- Note the connection details provided, both for public access and for VPC access (distinct details for each):
- Host: Your database hostname
- User: Default admin user
- Password: Auto-generated secure password
- Database: Authentication database name
Testing OpenSearch Database Connection
You can test the OpenSearch connection using the OpenSearch Dashboards with the provided credentials:
Step 3 — Deploying the Coreflux MQTT Broker on cloud servers
Creating the cloud servers
- Navigate to Droplets in your the cloud provider control panel
- Click Create Droplet
- Configure your droplet for MQTT broker deployment:
- Choose Region: Frankfurt (same as your managed database)
- VPC Network: Select coreflux-integrations-vpc
- Choose an image: Go to Marketplace tab
- Search for "Coreflux" and select Coreflux from the Marketplace
- Choose Size for your IoT workload:
- For development: Basic plan with 2 GB memory
- For production: Basic or General Purpose plan with 4+ GB memory for scalable performance
- Choose Authentication Method:
- SSH Key: Recommended for improved security
- A key can be created locally using ssh-keygen
- Password: Alternative option
- Finalize Details:
- Hostname: coreflux-test-broker
- Project: Select your project
- Tags: Add relevant tags for DevOps organization
- Click Create Droplet
- See the Droplet Home Page and wait for it to finish deploying
Alternative - Installing Coreflux MQTT Broker with Docker on Docker Image Droplet
Using the same approach as for Coreflux Droplet, select Docker as the Marketplace image.
Once your droplet is running, connect to it via SSH with the defined authentication method or the web console available in the Droplet home page:
ssh root@your-droplet-ip
Run the Coreflux MQTT broker using Docker:
docker run -d \
--name coreflux \
-p 1883:1883 \
-p 1884:1884 \
-p 5000:5000 \
-p 443:443 \
coreflux/coreflux-mqtt-broker-t:1.6.3
This Docker command:
- Runs the container in detached mode (-d)
- Names the container coreflux
- Exposes necessary ports for MQTT and web interface
- Uses the latest Coreflux image
Verify the MQTT broker is running:
docker ps
You should see a container running:
Validating the MQTT Broker deployment by connecting to it with default values
You can access the MQTT broker through a MQTT client like MQTT Explorer to validate the access to the broker, regardless of the approach taken to deploy it.
Step 4 — Configuring Firewall Rules for Secure IoT Communication (Optional)
For production IoT automation deployments, configure firewall rules to restrict access:
- Navigate to Networking → Firewalls
- Click Create Firewall
- Configure inbound rules for MQTT broker security:
- SSH: Port 22 from your IP
- MQTT: Port 1883 from your IoT application sources
- MQTT with TLS: Port 1884 for secure MQTT with TLS
- WebSocket: Port 5000 for MQTT through WebSocket
- WebSocket with TLS: Port 443 for MQTT through WebSocket with TLS
- Apply the firewall to your droplet
For detailed firewall configuration, refer to the cloud provider's firewall quickstart guide. Production tip: Restrict MQTT port 1883 to specific source IPs or VPC ranges only, and prefer port 1884 (MQTT with TLS) for external device connections. Consider using an app platform with private networking if you need additional security layers.
Step 5 — Setting Up IoT Data Integration with Coreflux's Language of Things
Installing the LoT Notebook Extension
The LoT (Language of Things) Notebook extension for Visual Studio Code provides an integrated low code development environment for MQTT broker programming and IoT automation. Learn more about Coreflux's Language of Things (LoT) for low-code IoT automation.
- Open Visual Studio Code
- Go to Extensions (Ctrl+Shift+X)
- Search for "LoT Notebooks"
- Install the LoT VSCode Notebooks Extension by Coreflux
Connecting to Your MQTT Broker
Configure the connection to your Coreflux MQTT broker, using default credentials, when prompted on the top bar or by clicking the MQTT button on the bottom bar on the left:
- User: root
- Password: coreflux
Assuming no errors, you'll see the status of the MQTT connectivity to the broker in the bottom bar, on the left.
Step 6 — Creating Data in MQTT Broker through Actions
For this use-case, we will build an integration of raw-data, through a transformation pipeline, into a Database. However, as we are not connected to any MQTT devices in the demo, we will take advantage of LoT's capabilities and use an Action to simulate device data.
In LoT, an Action is an executable logic that is triggered by specific events such as timed intervals, topic updates, or explicit calls from other actions or system components. Actions allow dynamic interaction with MQTT topics, internal variables, and payloads, facilitating complex IoT automation workflows.
As such, we can use an Action that generates data in certain topics in a defined time interval, that can then be used by the rest of the pipeline we will define below.
You can download the github repo with the sample project.
Generating Simulated IoT Data
Create an Action to generate simulated sensor data using the low code LoT (Language of Things) interface:
DEFINE ACTION RANDOMIZEMachineData
ON EVERY 10 SECONDS DO
PUBLISH TOPIC "raw_data/machine1" WITH RANDOM BETWEEN 0 AND 10
PUBLISH TOPIC "raw_data/station2" WITH RANDOM BETWEEN 0 AND 60
In the Notebook provided you also have an Action that does an incremental counter to simulate data, as an alternative to the provided Action.
When you run this Action, it will:
- Deploy automatically to the MQTT broker
- Generate simulated IoT sensor data every 10 seconds
- Publish real-time data to specific MQTT topics
- Show sync status in the LoT Notebook interface
- This status shows if the code on the LoT Notebook differs from the one running in the broker, or if it is missing entirely
Step 7 — Creating Data Transformation Models for Real-Time Processing
Defining Data Models with Language of Things
Models in Coreflux are used to transform, aggregate, and compute values from input MQTT topics, publishing the results to new topics. They serve the foundation for the creation of the UNS – Unified Namespace – of your system, applicable to your several data sources.
This way, a Model allows you to define how raw IoT data should be structured and transformed, both for a single device or for multiple devices simultaneously (through the use of the wildcard +). A model also serves as the key data schema used for scalable storage to the managed database.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
This low code model:
- Uses wildcard + to apply to all machines automatically
- Converts energy to watt-hours (energy_wh) by multiplying by 1000
- Determines production status based on energy thresholds
- Tracks production counts and stoppage events
- Adds timestamps to all real-time data points
- Extracts machine ID from the topic structure
- Publishes structured data to the Simulator/Machine/Data topics (replacing the + with each topic that matches the format for the trigger/source data)
As we generated two simulated sensors/machines with the Action, we can see the Model structure being applied automatically to both, generating both a json object and the individual topics.
Step 8 — Setting Up Database Integration for Scalable Storage
Choose the database integration section that matches your selected database from Step 2.
PostgreSQL Integration
In this section, you'll learn how to store your processed IoT data in a PostgreSQL managed database on the cloud provider.
To store your processed IoT data in a PostgreSQL managed database, you'll define a Route in Coreflux. Routes specify how data is sent from your MQTT broker to your PostgreSQL cluster using a simple, low-code configuration:
DEFINE ROUTE PostgreSQL_Log WITH TYPE POSTGRESQL
ADD SQL_CONFIG
WITH SERVER "db-postgresql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE
Replace with your PostgreSQL connection details from the cloud provider and run the Route in your LoT Notebook. Important: Use the VPC connection details (not public) for better security and lower latency. The VPC hostname and port differ from the public connection string—check your database cluster's connection details page for both options.
Updating the Model for PostgreSQL Database Storage
Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:
STORE IN "PostgreSQL_Log"
WITH TABLE "MachineProductionData"
Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "PostgreSQL_Log"
WITH TABLE "MachineProductionData"
After you deploy this updated action, all data should be automatically stored in the database when updated.
MySQL Integration
MySQL is a widely used relational database management system, making it an excellent choice for storing and analyzing IoT data at scale. In this section, you'll learn how to connect your Coreflux MQTT broker to a managed MySQL database on the cloud provider, so your real-time device data is securely and reliably persisted for analytics, reporting, or integration with other applications.
To enable this integration, you must define a Route in Coreflux's LoT (Language of Things) that instructs where and how the processed data should be sent. Below is the required low-code format for routing data to a MySQL database. Be sure to substitute your own connection details as needed:
DEFINE ROUTE MySQL_Log WITH TYPE MYSQL
ADD SQL_CONFIG
WITH SERVER "db-mysql.db.onmyserver.com"
WITH PORT 25060
WITH DATABASE "defaultdb"
WITH USERNAME "doadmin"
WITH PASSWORD "AVNS_pass"
WITH USE_SSL TRUE
WITH TRUST_SERVER_CERTIFICATE FALSE
Replace with your MySQL connection details from the cloud provider and run the Route in your LoT Notebook. Important: Use the VPC connection details (not public) for better security and lower latency. If you encounter connection issues, verify that TRUST_SERVER_CERTIFICATE is set correctly for your MySQL version—some versions require TRUE while others work with FALSE.
Updating the Model for MySQL Database Storage
Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:
STORE IN "MySQL_Log"
WITH TABLE "MachineProductionData"
Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "MySQL_Log"
WITH TABLE "MachineProductionData"
After you deploy this updated action, all data should be automatically stored in the database when updated.
MongoDB Integration
MongoDB is a NoSQL database that is well-suited for storing and querying IoT data with flexible schemas. In this section, you'll learn how to connect your Coreflux MQTT broker to a managed MongoDB database on the cloud provider, so your real-time device data is securely and reliably persisted for analytics, reporting, or integration with other applications.
To enable this integration, you must define a Route in Coreflux's LoT (Language of Things) that instructs where and how the processed data should be sent. Below is the required low-code format for routing data to a MongoDB database. Be sure to substitute your own connection details as needed:
DEFINE ROUTE mongo_route WITH TYPE MONGODB
ADD MONGODB_CONFIG
WITH CONNECTION_STRING "mongodb+srv://<username>:<password>@<cluster-uri>/<database>?tls=true&authSource=admin&replicaSet=<replica-set>"
WITH DATABASE "admin"
Replace with your MongoDB connection details from the cloud provider and run the Route in your LoT Notebook. Important: Use the VPC connection string format when available. The connection string should include tls=true and authSource=admin parameters. For troubleshooting MongoDB connections, see our guide on connecting to MongoDB.
Updating the Model for MongoDB Database Storage
Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:
STORE IN "mongo_route"
WITH TABLE "MachineProductionData"
Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "mongo_route"
WITH TABLE "MachineProductionData"
After you deploy this updated action, all data should be automatically stored in the database when updated.
OpenSearch Integration
OpenSearch is a distributed search and analytics engine designed for large-scale data processing and real-time analytics. In this section, you'll learn how to connect your Coreflux MQTT broker to a managed OpenSearch database on the cloud provider, so your real-time device data is securely and reliably persisted for analytics, reporting, or integration with other applications.
To enable this integration, you must define a Route in Coreflux's LoT (Language of Things) that instructs where and how the processed data should be sent. Below is the required low-code format for routing data to a OpenSearch database. Be sure to substitute your own connection details as needed:
DEFINE ROUTE OpenSearch_log WITH TYPE OPENSEARCH
ADD OPENSEARCH_CONFIG
WITH BASE_URL "https://my-opensearch-cluster:9200"
WITH USERNAME "myuser"
WITH PASSWORD "mypassword"
WITH USE_SSL TRUE
WITH IGNORE_CERT_ERRORS FALSE
Replace with your OpenSearch connection details from the cloud provider and run the Route in your LoT Notebook. Important: Use the VPC base URL (not public) when available. The base URL format is typically https://your-cluster-hostname:9200. For OpenSearch Dashboards access, use the separate Dashboards URL provided in your database cluster details. See our OpenSearch quickstart for more details.
Updating the Model for OpenSearch Database Storage
Modify your LoT model to use the database route for scalable storage, by adding this to the end of the Model:
STORE IN "OpenSearch_Log"
WITH TABLE "MachineProductionData"
Additionally, add a parameter with the topic, to have a unique identifier for each entry in your managed database.
DEFINE MODEL MachineData WITH TOPIC "Simulator/Machine/+/Data"
ADD "energy" WITH TOPIC "raw_data/+" AS TRIGGER
ADD "device_name" WITH REPLACE "+" WITH TOPIC POSITION 2 IN "+"
ADD "energy_wh" WITH (energy * 1000)
ADD "production_status" WITH (IF energy > 5 THEN "active" ELSE "inactive")
ADD "production_count" WITH (IF production_status EQUALS "active" THEN (production_count + 1) ELSE 0)
ADD "stoppage" WITH (IF production_status EQUALS "inactive" THEN 1 ELSE 0)
ADD "maintenance_alert" WITH (IF energy > 50 THEN TRUE ELSE FALSE)
ADD "timestamp" WITH TIMESTAMP "UTC"
STORE IN "OpenSearch_Log"
WITH TABLE "MachineProductionData"
After you deploy this updated action, all data should be automatically stored in the database when updated.
Step 9 — Verifying the Complete IoT Automation Pipeline
Monitoring Real-Time Data Flow
- MQTT Explorer: Use an MQTT client to verify real-time data publication
- Database Client: Connect to verify the storage of data (DBeaver for PostgreSQL, MongoDB Compass for MongoDB, or OpenSearch Dashboards for OpenSearch)
Verifying PostgreSQL Storage
Connect to your PostgreSQL managed database using DBeaver to verify scalable storage:
- Use the connection string from your the cloud provider database
- Navigate to the coreflux-broker-data database (or the name you gave to the database)
- Check the MachineProductionData table for stored records
As we've seen before, all of the data is available in the MQTT Broker for other uses and integrations.
Verifying MongoDB Storage
Connect to your MongoDB managed database using MongoDB Compass to verify scalable storage:
- Use the connection string from your the cloud provider database
- Navigate to the coreflux-broker-data database (or the name you gave to the database)
- Check the MachineProductionData collection for stored documents
You should see real-time data documents with structure similar to:
{
"_id": {
"$oid": "68626dc3e8385cbe9a1666c3"
},
"energy": 36,
"energy_wh": 36000,
"production_status": "active",
"production_count": 31,
"stoppage": 0,
"maintenance_alert": false,
"timestamp": "2025-06-30 10:58:11",
"device_name": "station2"
}
As we've seen before, all of the data is available in the MQTT Broker for other uses and integrations.
Verifying MySQL Storage
Connect to your MySQL managed database using DBeaver to verify scalable storage:
- Use the connection string from your the cloud provider database
- Navigate to the
coreflux-broker-datadatabase (or the name you gave to the database) - Check the
MachineProductionDatatable for stored records
As with the other integrations, all of the data is also available in the MQTT Broker for other uses and downstream integrations.
Verifying OpenSearch Storage
Open OpenSearch Dashboards with the provided URL and credentials:
- Open the Menu and select Index Management option
- Select the Indexes option in the Menu and see if your table name appears on the list
- Go back to the main page and select the Discover option in the Menu
- Create an Index Pattern following the steps it provided
- Go back to the Discover page and you should see your data
As we've seen before, all of the data is available in the MQTT Broker for other uses and integrations.
Step 10 - Expand Your Use Case and Integrations
Test LoT Capabilities
- Publish Sample Data: Use MQTT Explorer to publish sample datasets to your Coreflux broker. Experiment with different payload structures and different Models/Actions to see how they are processed and stored in your selected database.
- Data Validation: Verify that the data in your database matches the payloads you published. Check for consistency and accuracy using your database client (DBeaver for PostgreSQL, MongoDB Compass for MongoDB, or OpenSearch Dashboards for OpenSearch), ensuring your IoT automation integration is working as expected. Compare timestamps, field transformations, and data types to validate your real-time data pipeline.
- Real-Time Monitoring: Set up a continuous real-time data feed using another datasource of MQTT data, like a simple sensor with MQTT connectivity. Watch how Coreflux and your database handles incoming IoT data streams and explore response times for data retrieval and queries.
Build Analytics and Visualizations
- Create Dashboards: Integrate with visualization tools like Grafana to create dashboards that display your IoT data, pulling from both live MQTT topics and historical database queries. Track metrics like device uptime, sensor readings, production counts, or maintenance alerts from your automation systems. Learn how to set up monitoring with our guide on monitoring for managed databases with Prometheus and Grafana. For real-time dashboards, subscribe directly to MQTT topics; for historical trends and aggregates, query your database.
- Trend Analysis: Leverage your database's capabilities to analyze trends over time:
- PostgreSQL: Use SQL queries for complex relational analysis
- MongoDB: Use the aggregation framework for document-based analysis
- OpenSearch: Use advanced analytics and search capabilities for full-text search and time-series analysis
- Multi-Database Integration: Explore integrating additional managed databases like MongoDB for unstructured data, PostgreSQL for relational data, MySQL for structured queries, or OpenSearch for advanced analytics and search capabilities. Use Coreflux routes to send data to multiple destinations simultaneously.
Optimize and Scale Your IoT Infrastructure
- Load Testing: Simulate high traffic by publishing many messages simultaneously using LoT Notebook or automated scripts. Monitor how your Coreflux MQTT broker and database cluster handle the load and identify any bottlenecks in your Data Pipeline.
- Scaling: the cloud provider offers vertical and horizontal scaling options. Increase droplet resources (CPU, RAM, or storage) as your IoT data needs grow. Scale your managed database cluster to handle larger datasets and configure auto-scaling alerts to notify you when approaching resource limits.
Frequently Asked Questions
You integrate Coreflux MQTT broker with a managed database by defining a Route in LoT that points to your target service (PostgreSQL, MySQL, MongoDB, or OpenSearch). Each route uses the appropriate connection parameters (server or connection string, port, database name, username, password, and SSL/TLS options) and automatically persists MQTT message payloads into tables, collections, or indexes. Once the route is defined, you attach it to a Model with the STORE IN directive so that every processed message is written to your chosen database.
Yes. Coreflux is designed as a low-code integration layer, so you do not need to write application code or external ETL jobs to persist data. For each database type, you configure a LoT route (for example, PostgreSQL_Log, MySQL_Log, mongo_route, or OpenSearch_Log) and then extend your model with STORE IN "<route_name>" WITH TABLE "MachineProductionData". Coreflux handles connection pooling, retries, and error handling, so you focus on modeling topics and transformations instead of boilerplate database code.
The best managed database for your MQTT IoT data depends on your data structure, query needs, and analytics goals. Use the below comparison table below to help you decide:
| Database | Best For | Example Use Cases |
|---|---|---|
| PostgreSQL | Strong consistency, relational schemas, complex SQL queries | Industrial sensor networks, transactional events, analytics |
| MySQL | Relational data, structured queries, wide compatibility | Inventory systems, production metrics, traditional business records |
| MongoDB | Flexible, evolving schemas; document storage | Connected devices with variable payloads, IoT telemetry with changing formats |
| OpenSearch | Full-text search, analytics, dashboards, log indexing | Time-series analytics, monitoring, event logs, IoT search and visualization |
Tip: You can use more than one managed database at the same time by configuring multiple Coreflux routes. This makes it possible to store structured IoT data in PostgreSQL or MySQL, aggregate logs and metrics in OpenSearch, and collect unstructured or schemaless data in MongoDB, all from the same MQTT stream.
Coreflux keeps all processed values available on MQTT topics for real-time consumption, dashboards, or additional pipelines, while Routes persist the same modeled data to your databases for historical queries. In practice, you can subscribe to topics for immediate reactions (alerts, control loops) and query PostgreSQL/MySQL/MongoDB/OpenSearch for aggregates, trends, and long-term analysis. This dual-path design mirrors common patterns in MQTT and IoT data integration guides, where a broker provides live messaging and databases provide durable storage and analytics.
When deploying on the cloud provider, you can use VPC networking to keep all communication between your Coreflux MQTT broker and databases private. The VPC isolates your resources from public internet access, and managed databases support TLS encryption for connections. Additionally, you can create dedicated database users with limited permissions for your Coreflux application, following the principle of least privilege.
Yes. This architecture mirrors patterns used in production MQTT and database integrations, where a broker front-ends device traffic and a managed database tier provides durability and analytics. managed databases offer automated backups, high availability, and monitoring, while Coreflux MQTT broker can scale horizontally to handle high message throughput. For production, you should also configure firewall rules, use strong credentials, enable TLS for MQTT and database connections, and size your droplets and clusters based on expected message volume.
Yes. MQTT brokers are often deployed in private networks or edge environments, and public resources consistently note that MQTT can work without the public internet as long as clients can reach the broker. With the cloud provider, you can keep Coreflux and your databases inside a VPC and only expose what is strictly necessary (for example, a VPN, bastion host, or limited firewall rules). You can also synchronize selected topics with other brokers or cloud regions if you need hybrid or multi-site architectures.
MQTT is optimized for lightweight, event-driven messaging; databases are optimized for storage and querying. Storing every raw message can become expensive or noisy, so best practices recommend modeling data carefully (for example, aggregating metrics, filtering topics, or downsampling). Very low-power devices or ultra-constrained networks might struggle with persistent connections or TLS overhead, in which case you may need to tune QoS levels, batching, and retention policies. As long as you design your models and routes with these trade-offs in mind, MQTT plus managed databases works well for most IoT scenarios.
You should choose a managed database based on your IoT data structure, scalability, and how you want to query your device data. The table below summarizes the strengths of each option:
| Database | Best When… | Typical Use Cases | Key Strengths |
|---|---|---|---|
| PostgreSQL | You need complex relational queries, strong consistency, and transactional integrity (ACID support). | Industrial sensor networks, correlating device data with production, needing analytics over joined datasets | Relational schemas, advanced SQL, consistency |
| MySQL | Your workloads are structured, with wide tooling and compatibility needs. | Inventory tracking, traditional business systems, production metrics | Simpler relational needs, broad support |
| MongoDB | Your device payloads and schemas evolve, or you want fast prototyping with flexible, document-based storage. | IoT telemetry with variable formats, rapid development, semi-structured data | Flexible schemas, easy scaling, fast prototyping |
| OpenSearch | You need to analyze, search, or dashboard large volumes of IoT data (logs, time series, events). | Searching sensor data, log analytics, visualization, keyword/time-based queries | Search, full-text, analytics, fast aggregation |
Conclusion
Integrating Coreflux MQTT broker with the cloud provider's managed database services (PostgreSQL, MongoDB, MySQL, or OpenSearch) gives you a complete setup for real-time IoT data processing and storage. Following this tutorial, you've built an automation pipeline that collects, processes, and stores IoT data using low-code development practices.
With Coreflux's architecture and your chosen database's storage features, you can handle large volumes of real-time data and query it for insights. Whether you're monitoring industrial systems, tracking environmental sensors, or managing smart city infrastructure, this setup lets you make data-driven decisions based on both live MQTT topics and historical database queries.
You can learn more about managed databases and explore advanced Droplet configurations in the the cloud provider documentation.
You can try the provided use-cases or implement your own use-cases using Coreflux & the cloud provider. You can also get the free Coreflux MQTT Broker on cloud servers Marketplace or through Coreflux website.
Learn more about what you can do with Coreflux and LoT in the Coreflux Docs and Tutorials.
Additional Resources and Documentation
Note: While this tutorial uses the Coreflux or Docker Marketplace image for simplified deployment, you can also install Coreflux MQTT broker directly on Ubuntu. For manual installation instructions, visit the Coreflux Installation Guide.
the cloud provider Setup Guides
Project Management
- How to Create and Manage Projects on the cloud provider – Organize your IoT infrastructure resources effectively
Network Infrastructure
- How to Create a VPC Network – Set up secure private networking for your MQTT broker and databases
- Firewall Quickstart Guide – Configure security rules for production IoT deployments
Droplet Deployment
- How to Create a Droplet – Complete guide for deploying virtual machines
- Docker 1-Click Application – Deploy pre-configured Docker environment from the cloud provider Marketplace
- Using Docker 1-Click Install Tutorial – Step-by-step Docker setup instructions
Database Management
- Getting Started with PostgreSQL Databases – Quickstart guide for setting up and using managed PostgreSQL on the cloud provider
- How to Manage MongoDB Databases – Create and manage MongoDB databases and users
- How to Connect to MongoDB – Connect to your managed MongoDB cluster from applications
- Introduction to OpenSearch – Understand OpenSearch concepts and search fundamentals
Server Management
- Recommended Droplet Setup – Best practices for server security and configuration
- Connect to Droplet via Console – Access your droplet through the web-based console
Related the cloud provider Tutorials
- How To Install and Use PostgreSQL on Ubuntu – Manual PostgreSQL installation guide for self-managed deployments
- How to Install MongoDB on Ubuntu – Manual MongoDB installation guide for self-managed deployments
- Introduction to MongoDB – Understand MongoDB concepts and document database fundamentals