To control my points without having to use long cable runs I decided to use WiFi to send switch position information to the points out in the backyard.
To achieve this I am using WiFi capable Raspberry Pi5 and ESP 01/8266 WiFi Relay Modules.
![]() |
ESP ESP01/8266 WiFi Relay Module.ESP01 on Right Hand Side |
A 12V Sealed Acid Battery, topped up by a solar panel, will provide 5V (via a step down converter) to power the relay modules, the associated servo tester point controllers and the servos as previously used.
A python program running in the background (known as GPIO Daemon) will monitor the GPIO pins of the Pi to detect any change.
The point operation goes like this;
- Switch is closed and the associated GPIO pin is changed from High to Low.
- With the change of state of the GPIO pin the daemon sends a signal to the "Point Signal Control" program
- When the signal is received by the Point Signal Control program it sends a signal to the ESP 01/8266 relay modules.
- So that it knows which relay to activate it consults a "CSV" file with that information. A CSV (comma-separated values) file is a text file that stores data in a table-like format with commas separating each value. CSV files can be created and edited with a spreadsheets.(Excel or Libre Office Calc which is what I use)
- The WiFi relay module then changes the state of the relay connected to the servo tester, controlling the point throw. This is controlled by an ESP01/8266 which runs an Arduino "Sketch"
- The relays changing state is the same as the changing of state of the switch (acts as a proxy switch) as was used in the previous iteration.
- See Simple Inexpensive Servo Controller and Servo Controlled Points
![]() |
Line Drawing of servo Tester |
![]() |
Servo Tester used to control point throw |
import paho.mqtt.client as mqtt
import time
import json # Import JSON for structured messages
# Configuration
MQTT_HOST = "192.168.0.110" # Change to your MQTT broker's IP
MQTT_PORT = 1883
MQTT_TOPIC_PREFIX = "train/gpio"
GPIO_PINS = [4, 25] # Adjust to your GPIO pins
DEBOUNCE_TIME = 0.2 # Debounce time in seconds
class GPIODaemon:
def __init__(self):
self.lines = {}
self.last_states = {pin: None for pin in GPIO_PINS} # Track last states
self.setup_gpio()
self.setup_mqtt()
def setup_gpio(self):
chip = gpiod.Chip('gpiochip0') # Adjust if necessary
for pin in GPIO_PINS:
line = chip.get_line(pin)
try:
line.request(consumer='gpio_daemon', type=gpiod.LINE_REQ_EV_BOTH_EDGES)
self.lines[pin] = line
print(f"[GPIO Daemon] Successfully requested GPIO {pin}")
except OSError as e:
print(f"[GPIO Daemon] Error requesting GPIO {pin}: {e}")
continue # Skip this GPIO pin and continue with the others
def setup_mqtt(self):
self.client = mqtt.Client()
self.client.connect(MQTT_HOST, MQTT_PORT, 60)
self.client.loop_start()
def publish_gpio_state(self, pin, state):
topic = f"{MQTT_TOPIC_PREFIX}/{pin}"
# Create a JSON message with pin and state
message = {
"pin": pin,
"state": state
}
# Convert the dictionary to a JSON string
json_message = json.dumps(message)
self.client.publish(topic, json_message)
print(f"[GPIO Daemon] Published {json_message} to {topic}")
def run(self):
try:
while True:
for pin in GPIO_PINS:
if pin in self.lines: # Check if the line was successfully requested
line = self.lines[pin]
event = line.event_wait() # Wait for events (no timeout)
if event:
state = line.get_value()
# Only publish if the state has changed
if self.last_states[pin] is None or self.last_states[pin] != state:
self.publish_gpio_state(pin, state)
self.last_states[pin] = state # Update the last known state
time.sleep(DEBOUNCE_TIME) # Wait for debounce time
except KeyboardInterrupt:
print("Exiting GPIO Daemon...")
finally:
self.cleanup()
def cleanup(self):
for pin in GPIO_PINS:
if pin in self.lines: # Ensure the line exists before releasing
line = self.lines[pin]
line.release() # Release the GPIO line
self.client.loop_stop()
self.client.disconnect()
if __name__ == "__main__":
daemon = GPIODaemon()
daemon.run()
import gpiod
import paho.mqtt.client as mqtt
import time
import json # Import JSON for structured messages
# Configuration
MQTT_HOST = "192.168.0.110" # Change to your MQTT broker's IP
MQTT_PORT = 1883
MQTT_TOPIC_PREFIX = "train/gpio"
GPIO_PINS = [4, 25] # Adjust to your GPIO pins
DEBOUNCE_TIME = 0.2 # Debounce time in seconds
class GPIODaemon:
def __init__(self):
self.lines = {}
self.last_states = {pin: None for pin in GPIO_PINS} # Track last states
self.setup_gpio()
self.setup_mqtt()
def setup_gpio(self):
chip = gpiod.Chip('gpiochip0') # Adjust if necessary
for pin in GPIO_PINS:
line = chip.get_line(pin)
try:
line.request(consumer='gpio_daemon', type=gpiod.LINE_REQ_EV_BOTH_EDGES)
self.lines[pin] = line
print(f"[GPIO Daemon] Successfully requested GPIO {pin}")
except OSError as e:
print(f"[GPIO Daemon] Error requesting GPIO {pin}: {e}")
continue # Skip this GPIO pin and continue with the others
def setup_mqtt(self):
self.client = mqtt.Client()
self.client.connect(MQTT_HOST, MQTT_PORT, 60)
self.client.loop_start()
def publish_gpio_state(self, pin, state):
topic = f"{MQTT_TOPIC_PREFIX}/{pin}"
# Create a JSON message with pin and state
message = {
"pin": pin,
"state": state
}
# Convert the dictionary to a JSON string
json_message = json.dumps(message)
self.client.publish(topic, json_message)
print(f"[GPIO Daemon] Published {json_message} to {topic}")
def run(self):
try:
while True:
for pin in GPIO_PINS:
if pin in self.lines: # Check if the line was successfully requested
line = self.lines[pin]
event = line.event_wait() # Wait for events (no timeout)
if event:
state = line.get_value()
# Only publish if the state has changed
if self.last_states[pin] is None or self.last_states[pin] != state:
self.publish_gpio_state(pin, state)
self.last_states[pin] = state # Update the last known state
time.sleep(DEBOUNCE_TIME) # Wait for debounce time
except KeyboardInterrupt:
print("Exiting GPIO Daemon...")
finally:
self.cleanup()
def cleanup(self):
for pin in GPIO_PINS:
if pin in self.lines: # Ensure the line exists before releasing
line = self.lines[pin]
line.release() # Release the GPIO line
self.client.loop_stop()
self.client.disconnect()
if __name__ == "__main__":
daemon = GPIODaemon()
daemon.run()
The point Control Program;
import paho.mqtt.client as mqtt
import json
import csv
# Configuration
MQTT_HOST = "192.168.0.110" # Change to your MQTT broker's IP
MQTT_PORT = 1883
MQTT_TOPIC_PREFIX = "train/points" # Prefix for relay topics
CSV_FILE = "point_signal_control.csv" # Path to the CSV file
# Relay mapping dictionary
RELAY_MAPPING = {}
def load_csv_mapping():
"""
Load the GPIO-to-relay mapping from the CSV file.
"""
global RELAY_MAPPING
try:
with open(CSV_FILE, mode="r") as file:
reader = csv.DictReader(file)
for row in reader:
# Only include rows with valid GPIO and relay_id
if row["GPIO"] and row["relay_id"]:
gpio = int(row["GPIO"])
RELAY_MAPPING[gpio] = {
"relay_id": row["relay_id"],
"action": row["action"],
"gpio_mode": row["gpio_mode"]
}
print("[Relay Control] Loaded GPIO-to-relay mapping:")
for gpio, info in RELAY_MAPPING.items():
print(f" GPIO {gpio}: {info}")
except FileNotFoundError:
print(f"[Relay Control] Error: CSV file '{CSV_FILE}' not found.")
except Exception as e:
print(f"[Relay Control] Error loading CSV file: {e}")
def on_message(client, userdata, msg):
"""
Handle incoming MQTT messages for GPIO updates.
"""
try:
# Parse the JSON message
payload = json.loads(msg.payload.decode())
pin = payload.get("pin") # Extract the pin number
state = payload.get("state") # Extract the state
if pin is not None and pin in RELAY_MAPPING:
# Retrieve the relay information from the mapping
relay_info = RELAY_MAPPING[pin]
relay_id = relay_info["relay_id"]
topic = f"{MQTT_TOPIC_PREFIX}/{relay_id}" # Construct the topic
# Publish the "TOGGLE" message to the relay topic
print(f"[Relay Control] Received GPIO update: GPIO {pin} state {state}")
print(f"[Relay Control] Publishing 'TOGGLE' to topic '{topic}'")
client.publish(topic, "TOGGLE")
else:
print(f"[Relay Control] No mapping found for GPIO {pin}")
except json.JSONDecodeError:
print(f"[Relay Control] Invalid JSON message: {msg.payload}")
except KeyError as e:
print(f"[Relay Control] Missing key in message: {e}")
except Exception as e:
print(f"[Relay Control] Error processing message: {e}")
def main():
# Load the GPIO-to-relay mapping from the CSV file
load_csv_mapping()
# Initialize MQTT client
client = mqtt.Client()
client.on_message = on_message
# Connect to the MQTT broker
client.connect(MQTT_HOST, MQTT_PORT, 60)
# Subscribe to the GPIO topic
client.subscribe("train/gpio/#")
# Start the MQTT loop
print("[Relay Control] Listening for GPIO updates...")
client.loop_forever()
if __name__ == "__main__":
main()
The Arduino Sketch;
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
// WiFi credentials
const char* ssid = "Ringbalin";
const char* password = "TrainsControl";
// MQTT Broker details
const char* mqtt_server = "192.168.4.1";
// Create an ESP8266 WiFi client and MQTT client
WiFiClient espClient;
PubSubClient client(espClient);
// Define relay pins
#define RELAY01_PIN 2 // Adjust these pin numbers as needed
#define RELAY02_PIN 0
// Forward declarations
void callback(char* topic, byte* payload, unsigned int length);
void reconnect();
void toggleRelay(int relayPin);
void setup() {
// Start Serial for debugging
Serial.begin(115200);
delay(10);
// Initialize relay pins as OUTPUT
pinMode(RELAY01_PIN, OUTPUT);
pinMode(RELAY02_PIN, OUTPUT);
// Connect to WiFi
Serial.print("Connecting to WiFi");
WiFi.begin(ssid, password);
while(WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println();
Serial.println("WiFi connected");
// Setup MQTT server and callback
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
}
// This function is called when an MQTT message arrives
void callback(char* topic, byte* payload, unsigned int length) {
// Convert payload to string for processing
String messageTemp;
for (unsigned int i = 0; i < length; i++) {
messageTemp += (char)payload[i];
}
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
Serial.println(messageTemp);
// Check the topic and message content, then toggle the corresponding relay
if (String(topic) == "train/points/RELAY01" && messageTemp == "TOGGLE") {
toggleRelay(RELAY01_PIN);
} else if (String(topic) == "train/points/RELAY02" && messageTemp == "TOGGLE") {
toggleRelay(RELAY02_PIN);
}
}
// Function to toggle the state of a relay connected to relayPin
void toggleRelay(int relayPin) {
int currentState = digitalRead(relayPin);
// Toggle the state (adjust if your relay is active low or active high)
if (currentState == HIGH) {
digitalWrite(relayPin, LOW);
Serial.print("Relay on pin ");
Serial.print(relayPin);
Serial.println(" turned OFF");
} else {
digitalWrite(relayPin, HIGH);
Serial.print("Relay on pin ");
Serial.print(relayPin);
Serial.println(" turned ON");
}
}
// Function to reconnect to the MQTT broker if the connection is lost
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
String clientId = "ESP8266Client-";
clientId += String(random(0xffff), HEX);
if (client.connect(clientId.c_str())) {
Serial.println(" connected");
// Subscribe to the topics for both relays
client.subscribe("train/points/RELAY01");
client.subscribe("train/points/RELAY02");
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
No comments:
Post a Comment