Using AI bots I constructed a Graphical User Interface (GUI), think Windows, program using the Python language.
Features of the GUI allow me to
- Draw a line drawing of my layout similar to the drawing programs in Office type software Which means I can edit the lines length and position them using drag and drop. To Edit the lines there are Add and Delete buttons on the screen
- Similar to the Lines function I can Add and Delete icons for Points, Signals and Sensors, with the icons having labels identifying them. e.g Point1, Signal1 etc. They are also moved using the drag and drop method.
I am running this on a Raspberry Pi 5 single board computer but it should also run on an Arduino or anything that can run the Python3 language (the AI bots can convert to another language if you are so inclined.)
I am running it in the Thonny IDE because that came preinstalled with the Pi Operating System (at the time of writing the version is called bookworm)
I have toggle switches connected to General Purpose Input/Output (GPIO) pins on the Pi which, when toggled, change the colour of the points icons from Blue to Orange to show their state.
There are 3 pages in the GUI "Upper Yard", "Outside" and "Lower Yard/Dock" which correspond to the three sections I divided my layout into.
There is a functionality to allow one GPIO pin to change multiple icon's colour example at the crossover on the 'Outside' page,
The program reads the information in a "CSV" file and controls the actions of icon and the page it is located on.
A CSV (comma-separated values) file is a text file that stores data in a table-like format with commas separating each value. CSV files can be opened with many spreadsheets. To get a CSV file I use Libre Office Calc spreadsheet (similar to Excel) to change a table then save it in CSV format.
The GPIO handling is the same as "Using a Raspberry Pi for WiFi Control of Points"
Using a Raspberry Pi for WiFi Control of Points
This is a very basic program that could be customised either by using;
- Python if understood or;
- an AI bot (how I wrote it)
Switch Closed |
Future additional functionality
Signal Control is going to be the same as point control but I do have an idea to change the program to have it change the signals when a signal icon is clicked on.
The code
Put this into an IDE (I use Thonny which come installed on a pi) to run it.
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import tkinter as tk
from tkinter import messagebox, simpledialog
import json
import os
import csv
import threading
import time
import paho.mqtt.client as mqtt
import socket
# File paths update:
HOME_DIR = os.path.expanduser('~')
TRACK_EDITOR_DIR = os.path.join(HOME_DIR, 'Desktop', 'Track_Editor')
LAYOUT_FILE = os.path.join(TRACK_EDITOR_DIR, 'track_layout.json') # Using track_layout.json
PIN_ICON_MAPPING_FILE = os.path.join(TRACK_EDITOR_DIR, 'pin_icon_mapping.csv')
class TrainTrackEditor:
def __init__(self, root):
self.root = root
self.root.title("Train Track Editor")
# Try to load existing layout
self.current_screen = 1
self.screen_data = self.load_existing_layout()
# MQTT Configuration - Corrected IP address
self.MQTT_HOST = "192.168.4.1" # Raspberry Pi address
self.MQTT_PORT = 1883
self.MQTT_TOPIC_PREFIX = "train/gpio"
self.RELAY_MAPPING = {}
self.page_to_screen = {
"UpperYard": 1,
"Outside": 2,
"LowerYard/Dock": 3
}
self.load_pin_mapping()
# Start MQTT client
self.mqtt_connected = False
try:
print(f"Attempting to connect to MQTT broker at {self.MQTT_HOST}:{self.MQTT_PORT}")
# Check if the host is reachable first
try:
socket.create_connection((self.MQTT_HOST, self.MQTT_PORT), timeout=3)
print(f"Host {self.MQTT_HOST} is reachable on port {self.MQTT_PORT}")
except socket.error as e:
print(f"Host {self.MQTT_HOST} is not reachable on port {self.MQTT_PORT}: {e}")
self.client = mqtt.Client()
self.client.on_message = self.on_message
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
# Connect with a shorter timeout
self.client.connect(self.MQTT_HOST, self.MQTT_PORT, 5)
self.client.subscribe(f"{self.MQTT_TOPIC_PREFIX}/#")
self.client.loop_start()
print("MQTT client started")
except Exception as e:
print(f"Error connecting to MQTT broker: {e}")
# Continue even if MQTT fails - we'll still have the UI functionality
# Drag and drop variables
self.selected_element = None
self.dragging = False
self.dragging_mode = None
self.current_element = None
self.start_point = None
self.drawing_line = False
self.temp_line = None
# Variables for line dragging
self.drag_start_x = None
self.drag_start_y = None
self.setup_ui()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.mqtt_connected = True
print("Connected to MQTT broker successfully")
# Resubscribe to topics
self.client.subscribe(f"{self.MQTT_TOPIC_PREFIX}/#")
else:
connection_results = {
1: "incorrect protocol version",
2: "invalid client identifier",
3: "server unavailable",
4: "bad username or password",
5: "not authorized"
}
reason = connection_results.get(rc, f"unknown error code {rc}")
print(f"Failed to connect to MQTT broker: {reason}")
def on_disconnect(self, client, userdata, rc):
self.mqtt_connected = False
if rc != 0:
print(f"Unexpected MQTT disconnection (code {rc}). Will attempt to reconnect.")
# The client will automatically try to reconnect
def load_pin_mapping(self):
try:
with open(PIN_ICON_MAPPING_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
if row['Pin'] and row['Icon'] and row['Label']:
pin = int(row['Pin'])
# If pin already exists in mappings, append to its list
if pin in self.RELAY_MAPPING:
self.RELAY_MAPPING[pin].append({
'icon': row['Icon'],
'label': row['Label'],
'page': row['Page'],
'action': row['Action']
})
# If pin doesn't exist, create new list with first mapping
else:
self.RELAY_MAPPING[pin] = [{
'icon': row['Icon'],
'label': row['Label'],
'page': row['Page'],
'action': row['Action']
}]
print(f"Loaded pin mapping: {self.RELAY_MAPPING}")
except Exception as e:
print(f"Error loading pin mapping: {e}")
def on_message(self, client, userdata, msg):
"""
Handle incoming MQTT messages for GPIO updates.
"""
try:
# Parse the JSON message
payload = json.loads(msg.payload.decode())
pin = payload.get("pin") # Extract the pin number
state = payload.get("state") # Extract the state
print(f"[MQTT] Received message on topic {msg.topic}: {payload}")
if pin is not None and pin in self.RELAY_MAPPING:
# Retrieve the relay information from the mapping
mappings = self.RELAY_MAPPING[pin]
for mapping in mappings:
icon = mapping['icon']
label = mapping['label']
page = mapping['page']
print(f"[MQTT] Processing: GPIO {pin} state {state}, icon {icon}, label {label}, page {page}")
self.toggle_element_color(icon, label, page, state == 1) # Assuming state 1 means high
else:
print(f"[MQTT] No mapping found for GPIO {pin}")
except json.JSONDecodeError:
print(f"[MQTT] Invalid JSON message: {msg.payload}")
except Exception as e:
print(f"[MQTT] Error processing message: {e}")
def load_existing_layout(self):
default_data = {
1: {"sensors": [], "points": [], "signals": [], "lines": []},
2: {"sensors": [], "points": [], "signals": [], "lines": []},
3: {"sensors": [], "points": [], "signals": [], "lines": []}
}
try:
if os.path.exists(LAYOUT_FILE):
with open(LAYOUT_FILE, "r") as f:
loaded_data = json.load(f)
return {int(k): v for k, v in loaded_data.items()}
except Exception as e:
print(f"Error loading layout: {e}")
return default_data
def setup_ui(self):
# Screen button frame
self.screen_button_frame = tk.Frame(self.root)
self.screen_button_frame.pack(side=tk.TOP, anchor='w')
# Save/Load button frame
self.save_load_button_frame = tk.Frame(self.root)
self.save_load_button_frame.pack(side=tk.TOP, anchor='e')
# Add Sensor, Point, Signal button frame
self.add_controls_frame = tk.Frame(self.root)
self.add_controls_frame.pack(side=tk.LEFT, fill=tk.Y)
# Create canvas for drawing
self.canvas_frame = tk.Frame(self.root)
self.canvas_frame.pack(side=tk.TOP, fill=tk.BOTH, expand=True)
self.fig, self.ax = plt.subplots(figsize=(6, 4))
self.canvas = FigureCanvasTkAgg(self.fig, master=self.canvas_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
self.ax.set_xlim(0, 10)
self.ax.set_ylim(0, 8)
self.ax.grid(True)
# Connect interactive events
self.canvas.mpl_connect("button_press_event", self.on_click)
self.canvas.mpl_connect("motion_notify_event", self.on_drag)
self.canvas.mpl_connect("button_release_event", self.on_release)
# Add screen buttons
self.add_screen_buttons()
# Add save/load buttons
self.add_save_load_buttons()
# Add control buttons
self.add_control_buttons()
# Draw the initial screen
self.update_screen()
def add_screen_buttons(self):
tk.Button(self.screen_button_frame, text="Upper Yard",
command=lambda: self.switch_screen(1)).pack(side=tk.LEFT)
tk.Button(self.screen_button_frame, text="Outside",
command=lambda: self.switch_screen(2)).pack(side=tk.LEFT)
tk.Button(self.screen_button_frame, text="Lower Yard/Dock",
command=lambda: self.switch_screen(3)).pack(side=tk.LEFT)
def add_save_load_buttons(self):
tk.Button(self.save_load_button_frame, text="Save Layout",
command=self.save_layout).pack(side=tk.RIGHT)
tk.Button(self.save_load_button_frame, text="Load Layout",
command=self.load_layout).pack(side=tk.RIGHT)
def add_control_buttons(self):
tk.Button(self.add_controls_frame, text="Add Sensor", command=self.add_sensor).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Add Point", command=self.add_point).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Add Signal", command=self.add_signal).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Add Line", command=self.start_line).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Delete Sensor", command=self.prepare_delete_sensor).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Delete Point", command=self.prepare_delete_point).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Delete Signal", command=self.prepare_delete_signal).pack(side=tk.TOP)
tk.Button(self.add_controls_frame, text="Delete Line", command=self.prepare_delete_line).pack(side=tk.TOP)
def save_layout(self):
try:
save_data = {str(k): v for k, v in self.screen_data.items()}
with open(LAYOUT_FILE, "w") as f:
json.dump(save_data, f, indent=4)
messagebox.showinfo("Success", "Layout saved successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to save layout: {e}")
def load_layout(self):
try:
if os.path.exists(LAYOUT_FILE):
with open(LAYOUT_FILE, "r") as f:
loaded_data = json.load(f)
self.screen_data = {int(k): v for k, v in loaded_data.items()}
self.update_screen()
messagebox.showinfo("Success", "Layout loaded successfully!")
except Exception as e:
messagebox.showerror("Error", f"Failed to load layout: {e}")
def switch_screen(self, screen_number):
self.current_screen = screen_number
self.update_screen()
def add_sensor(self):
label = simpledialog.askstring("Input", "Enter sensor label:")
if label:
self.screen_data[self.current_screen]['sensors'].append({
'x': 0.25,
'y': 7.8,
'label': label,
'color': 'black'
})
self.update_screen()
def add_point(self):
label = simpledialog.askstring("Input", "Enter point label:")
if label:
self.screen_data[self.current_screen]['points'].append({
'x': 1.0,
'y': 7.8,
'label': label,
'color': 'blue'
})
self.update_screen()
def add_signal(self):
label = simpledialog.askstring("Input", "Enter signal label:")
if label:
self.screen_data[self.current_screen]['signals'].append({
'x': 1.5,
'y': 7.8,
'label': label,
'color': 'red'
})
self.update_screen()
def start_line(self):
self.drawing_line = True
self.start_point = None
self.temp_line = None
def prepare_delete_sensor(self):
self.dragging_mode = 'delete_sensor'
def prepare_delete_point(self):
self.dragging_mode = 'delete_point'
def prepare_delete_signal(self):
self.dragging_mode = 'delete_signal'
def prepare_delete_line(self):
self.dragging_mode = 'delete_line'
# Helper method to calculate if a point is on a line segment
def is_point_on_line(self, x, y, line):
x1, y1 = line["start"]
x2, y2 = line["end"]
# Calculate the distance from point to line segment
line_length = ((x2 - x1) ** 2 + (y2 - y1) ** 2) ** 0.5
if line_length == 0: # Handle the case of a point
return abs(x - x1) < 0.3 and abs(y - y1) < 0.3
# Calculate the distance from point to line
distance = abs((y2 - y1) * x - (x2 - x1) * y + x2 * y1 - y2 * x1) / line_length
# Check if the point is close enough to the line
if distance > 0.3:
return False
# Check if the point is within the line segment bounds
dot_product = ((x - x1) * (x2 - x1) + (y - y1) * (y2 - y1)) / (line_length ** 2)
return 0 <= dot_product <= 1
def on_click(self, event):
if event.inaxes != self.ax:
return
screen = self.screen_data[self.current_screen]
# Track if we interact with any element
interacted_with_element = False
# Check if we are in delete mode for signals
if self.dragging_mode == 'delete_signal':
# Improved detection for signals
for i, signal in enumerate(screen["signals"]):
# Wider detection area for easier deletion
if abs(event.xdata - signal["x"]) < 0.5 and abs(event.ydata - signal["y"]) < 0.5:
print(f"Deleting signal {signal.get('label', 'unlabeled')}")
del screen["signals"][i]
self.dragging_mode = None
self.update_screen()
interacted_with_element = True
return
# If we're in delete mode but didn't find a signal, exit delete mode
self.dragging_mode = None
messagebox.showinfo("Info", "Click on a signal to delete it.")
return
# Check if we are in delete mode for lines
if self.dragging_mode == 'delete_line':
# Improved line detection for deletion
for i, line in enumerate(screen["lines"]):
# Check if click is near either endpoint or on the line
if self.is_point_on_line(event.xdata, event.ydata, line):
print(f"Deleting line {i}")
del screen["lines"][i]
self.dragging_mode = None
self.update_screen()
interacted_with_element = True
return
# If we're in delete mode but didn't find a line, exit delete mode
self.dragging_mode = None
messagebox.showinfo("Info", "Click on a line to delete it.")
return
# Check if a line is clicked for editing
for line in screen["lines"]:
# Check if either endpoint is clicked
if (abs(event.xdata - line["start"][0]) < 0.3 and abs(event.ydata - line["start"][1]) < 0.3):
self.selected_element = line
self.dragging = True
self.dragging_mode = 'line_start'
interacted_with_element = True
return
elif (abs(event.xdata - line["end"][0]) < 0.3 and abs(event.ydata - line["end"][1]) < 0.3):
self.selected_element = line
self.dragging = True
self.dragging_mode = 'line_end'
interacted_with_element = True
return
# Check if the line itself is clicked (for moving the whole line)
if self.is_point_on_line(event.xdata, event.ydata, line):
self.selected_element = line
self.dragging = True
self.dragging_mode = 'line_move'
# Store the initial click offset for smooth dragging
self.drag_start_x = event.xdata
self.drag_start_y = event.ydata
interacted_with_element = True
return
# Check if a label is clicked for editing
for sensor in screen["sensors"]:
if abs(event.xdata - sensor["x"]) < 0.3 and abs(event.ydata - (sensor["y"] - 0.2)) < 0.1:
new_label = simpledialog.askstring(
"Input",
"Enter label for sensor:",
initialvalue=sensor.get("label", "")
)
if new_label is not None and new_label.strip():
sensor["label"] = new_label.strip()
self.update_screen()
interacted_with_element = True
return
for point in screen["points"]:
if abs(event.xdata - point["x"]) < 0.3 and abs(event.ydata - (point["y"] - 0.2)) < 0.1:
new_label = simpledialog.askstring(
"Input",
"Enter label for point:",
initialvalue=point.get("label", "")
)
if new_label is not None and new_label.strip():
point["label"] = new_label.strip()
self.update_screen()
interacted_with_element = True
return
for signal in screen["signals"]:
if abs(event.xdata - signal["x"]) < 0.3 and abs(event.ydata - (signal["y"] - 0.2)) < 0.1:
new_label = simpledialog.askstring(
"Input",
"Enter label for signal:",
initialvalue=signal.get("label", "")
)
if new_label is not None and new_label.strip():
signal["label"] = new_label.strip()
self.update_screen()
interacted_with_element = True
return
if self.dragging_mode == 'delete_point':
for point in screen["points"]:
if abs(event.xdata - point["x"]) < 0.3 and abs(event.ydata - point["y"]) < 0.3:
screen["points"].remove(point)
self.dragging_mode = None
self.update_screen()
interacted_with_element = True
return
if self.dragging_mode == 'delete_sensor':
for sensor in screen["sensors"]:
if abs(event.xdata - sensor["x"]) < 0.3 and abs(event.ydata - sensor["y"]) < 0.3:
screen["sensors"].remove(sensor)
self.dragging_mode = None
self.update_screen()
interacted_with_element = True
return
# Regular point selection for dragging
for point in screen["points"]:
if abs(event.xdata - point["x"]) < 0.3 and abs(event.ydata - point["y"]) < 0.3:
self.selected_element = point
self.dragging = True
self.dragging_mode = 'point'
interacted_with_element = True
return
# Signal selection for dragging
for signal in screen["signals"]:
if abs(event.xdata - signal["x"]) < 0.3 and abs(event.ydata - signal["y"]) < 0.3:
self.selected_element = signal
self.dragging = True
self.dragging_mode = 'signal'
interacted_with_element = True
return
# Check if a sensor is clicked
for sensor in screen["sensors"]:
if abs(event.xdata - sensor["x"]) < 0.3 and abs(event.ydata - sensor["y"]) < 0.3:
self.selected_element = sensor
self.dragging = True
self.dragging_mode = 'sensor'
interacted_with_element = True
return
# Check if we are drawing a line
if self.drawing_line:
if self.start_point is None:
self.start_point = (event.xdata, event.ydata)
self.temp_line = {"start": self.start_point, "end": self.start_point}
interacted_with_element = True
else:
end_point = (event.xdata, event.ydata)
screen["lines"].append({"start": self.start_point, "end": end_point})
self.start_point = None
self.drawing_line = False
self.temp_line = None
self.update_screen()
interacted_with_element = True
return
# If we clicked but didn't interact with any element, clear any selection
if not interacted_with_element:
if self.selected_element is not None:
self.selected_element = None
self.dragging = False
self.update_screen() # Redraw to remove red dots
def on_drag(self, event):
if not event.inaxes:
return
if self.dragging and self.selected_element:
if self.dragging_mode == 'line_start':
# Update the line's start point
self.selected_element["start"] = (event.xdata, event.ydata)
self.update_screen()
elif self.dragging_mode == 'line_end':
# Update the line's end point
self.selected_element["end"] = (event.xdata, event.ydata)
self.update_screen()
elif self.dragging_mode == 'line_move':
# Move the entire line
# Calculate how much the mouse has moved
dx = event.xdata - self.drag_start_x
dy = event.ydata - self.drag_start_y
# Update both start and end points
x1, y1 = self.selected_element["start"]
x2, y2 = self.selected_element["end"]
self.selected_element["start"] = (x1 + dx, y1 + dy)
self.selected_element["end"] = (x2 + dx, y2 + dy)
# Update drag reference point
self.drag_start_x = event.xdata
self.drag_start_y = event.ydata
self.update_screen()
elif self.dragging_mode in ['point', 'signal', 'sensor']:
# Original point/signal/sensor dragging
self.selected_element["x"] = event.xdata
self.selected_element["y"] = event.ydata
self.update_screen()
if self.drawing_line and self.start_point is not None:
self.temp_line = {"start": self.start_point, "end": (event.xdata, event.ydata)}
self.update_screen()
def on_release(self, event):
self.dragging = False
# Don't clear selected_element here, so the red dots remain until clicking elsewhere
def toggle_element_color(self, icon_type, label, page, is_high):
# Get the screen number based on the page information
screen_num = self.page_to_screen.get(page)
if not screen_num:
print(f"Unknown page: {page}")
return
screen = self.screen_data[screen_num]
print(f"Toggling {icon_type} {label} on page {page} (screen {screen_num}) to {'high' if is_high else 'low'}")
if icon_type == 'Point':
for point in screen['points']:
if point.get('label') == label:
point['color'] = '#FFA500' if is_high else 'blue' # Using hex for orange
if screen_num == self.current_screen: # Only update display if this is the current screen
self.update_screen()
return
elif icon_type == 'Signal':
for signal in screen['signals']:
if signal.get('label') == label:
signal['color'] = 'green' if is_high else 'red'
if screen_num == self.current_screen: # Only update display if this is the current screen
self.update_screen()
return
elif icon_type == 'Sensor':
for sensor in screen['sensors']:
if sensor.get('label') == label:
sensor['color'] = 'yellow' if is_high else 'black'
if screen_num == self.current_screen: # Only update display if this is the current screen
self.update_screen()
return
print(f"No matching {icon_type} with label {label} found on page {page}")
def update_screen(self):
self.ax.clear()
self.ax.set_xlim(0, 10)
self.ax.set_ylim(0, 8)
self.ax.grid(True)
screen = self.screen_data[self.current_screen]
# Draw lines
for line in screen["lines"]:
# Check if this line is selected
is_selected = (self.selected_element == line)
# Draw the line (always black and solid)
self.ax.plot(
[line["start"][0], line["end"][0]],
[line["start"][1], line["end"][1]],
color="black",
linewidth=2,
)
# If selected, highlight the endpoints
if is_selected:
if self.dragging_mode == 'line_start' or self.dragging_mode == 'line_move':
self.ax.plot(line["start"][0], line["start"][1], 'ro', markersize=6)
if self.dragging_mode == 'line_end' or self.dragging_mode == 'line_move':
self.ax.plot(line["end"][0], line["end"][1], 'ro', markersize=6)
# Draw temporary line while drawing
if self.temp_line is not None:
self.ax.plot(
[self.temp_line["start"][0], self.temp_line["end"][0]],
[self.temp_line["start"][1], self.temp_line["end"][1]],
color="black",
linewidth=2,
linestyle="--"
)
# Draw sensors
for sensor in screen["sensors"]:
color = sensor.get('color', 'black')
self.ax.plot(sensor["x"], sensor["y"], "o", color=color, markersize=12.5, markeredgecolor='black', markeredgewidth=1)
self.ax.text(
sensor["x"],
sensor["y"] - 0.2,
sensor.get("label", ""),
ha='center',
va='top',
fontsize=8
)
# Draw signals
for signal in screen["signals"]:
color = signal.get('color', 'red')
self.ax.plot(signal["x"], signal["y"], "^", color=color, markersize=12.5, markeredgecolor='black', markeredgewidth=1)
self.ax.text(
signal["x"],
signal["y"] - 0.2,
signal.get("label", ""),
ha='center',
va='top',
fontsize=8
)
# Draw points
for point in screen["points"]:
color = point.get('color', 'blue')
self.ax.plot(point["x"], point["y"], "s", color=color, markersize=12.5, markeredgecolor='black', markeredgewidth=1)
self.ax.text(
point["x"],
point["y"] - 0.2,
point.get("label", ""),
ha='center',
va='top',
fontsize=8
)
self.canvas.draw()
def cleanup(self):
if hasattr(self, 'client') and self.mqtt_connected:
try:
self.client.loop_stop()
self.client.disconnect()
except:
pass
def on_closing(self):
self.cleanup()
self.save_layout()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
root.geometry("800x600")
editor = TrainTrackEditor(root)
root.protocol("WM_DELETE_WINDOW", editor.on_closing)
root.mainloop()
No comments:
Post a Comment