รายละเอียดของ code ในไฟล์ command_node.py และ drive_node.py
อธิบายโค้ดโดยละเอียด Command_node.py
1. การนำเข้าไลบรารีที่ใช้
import rclpy from rclpy.node import Node from std_msgs.msg import String from pynput import keyboard import threading |
- rclpy: ไลบรารีสำหรับพัฒนา ROS2 nodes ในภาษา Python
- rclpy.node.Node: คลาสพื้นฐานสำหรับสร้าง node ใน ROS2
- std_msgs.msg.String: ชนิดของข้อความที่ใช้ใน ROS2 (ข้อความแบบ string)
- pynput.keyboard: ไลบรารีสำหรับตรวจจับการกดแป้นพิมพ์
- threading: ไลบรารีสำหรับจัดการเธรด (ใช้ในกรณีต้องการทำงานหลายอย่างพร้อมกัน)
2. การสร้างคลาส CommandNode
class CommandNode(Node): def __init__(self): super().__init__(‘command_node’) |
- สร้าง node ชื่อ command_node โดยเรียกใช้คอนสตรัคเตอร์ของคลาส Node ผ่าน super().
3. สร้าง Publisher
self.publisher_ = self.create_publisher(String, ‘/drive_commands’, 10) |
- สร้าง publisher สำหรับส่งข้อความประเภท String ไปยัง topic ชื่อ /drive_commands.
- 10: คือขนาดของ queue (จำนวนข้อความที่เก็บไว้ในกรณีที่การส่งข้อมูลล่าช้า).
4. แสดงข้อความใน log
self.get_logger().info(‘CommandNode is running. Press keys to send commands.’) |
- แสดงข้อความใน log เพื่อบอกว่าตัว node ทำงานเรียบร้อยแล้ว.
5. ตั้งค่าการฟังแป้นพิมพ์
self.get_logger().info(‘CommandNode is ruself.listener = keyboard.Listener(on_press=self.on_press) self.listener.start() |
- ใช้ keyboard.Listener เพื่อตรวจจับการกดแป้นพิมพ์ โดยเรียกใช้ฟังก์ชัน on_press ทุกครั้งที่กดแป้น.
6. จัดการการกดแป้นพิมพ์
def on_press(self, key): try: if key.char == ‘i’: self.publish_command(‘Forward’) elif key.char == ‘j’: self.publish_command(‘TurnLeft’) elif key.char == ‘l’: self.publish_command(‘TurnRight’) elif key.char == ‘m’: self.publish_command(‘Backward’) elif key.char == ‘k’: self.publish_command(‘Stop’) except AttributeError: pass |
- ฟังก์ชันนี้จะถูกเรียกเมื่อมีการกดแป้นพิมพ์.
- ตรวจสอบว่าแป้นที่กดเป็น i, j, l, m หรือ k และเรียก publish_command พร้อมส่งคำสั่งที่เกี่ยวข้อง.
- AttributeError: จัดการกรณีที่กดปุ่มพิเศษ เช่น Shift หรือ Ctrl.
7. ส่งคำสั่งไปยัง topic
def publish_command(self, command): msg = String() msg.data = command self.publisher_.publish(msg) self.get_logger().info(f’Publishing: “{msg.data}”‘) |
- สร้างข้อความประเภท String.
- กำหนดค่า msg.data เป็นคำสั่งที่ต้องการส่ง เช่น ‘Forward’, ‘Stop’.
- ส่งข้อความผ่าน publisher ไปยัง topic /drive_commands.
- แสดงข้อความใน log เพื่อบอกว่ากำลังส่งคำสั่ง.
8. ฟังก์ชันหลัก
def main(args=None): rclpy.init(args=args) node = CommandNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() |
- rclpy.init(): เริ่มต้นระบบ ROS2.
- สร้าง CommandNode.
- rclpy.spin(node): รัน node จนกว่าจะกดหยุดด้วย Ctrl+C.
- เมื่อหยุดการทำงาน:
- ทำลาย node ด้วย destroy_node().
- ปิดระบบ ROS2 ด้วย rclpy.shutdown().
9. จุดเริ่มต้นของโปรแกรม
if __name__ == ‘__main__’: main() |
- หากรันไฟล์นี้โดยตรง จะเรียกใช้ฟังก์ชัน main.
รายละเอียดโค้ด drive_node.py
1. การนำเข้าไลบรารีที่ใช้
rt rclpy from rclpy.node import Node from std_msgs.msg import String |
- rclpy: ไลบรารีหลักของ ROS2 ที่ใช้สำหรับสร้าง node และจัดการระบบสื่อสาร.
- Node: คลาสพื้นฐานที่ใช้ในการสร้าง node ใน ROS2.
- String: ชนิดของข้อความที่ใช้สำหรับส่งข้อมูลผ่าน topic (ในที่นี้คือข้อความแบบ string).
2. การสร้างคลาส DriveNode
class DriveNode(Node): |
- สร้างคลาส DriveNode ซึ่งสืบทอดคุณสมบัติจากคลาส Node.
- ทำหน้าที่เป็น node สำหรับรับคำสั่งและดำเนินการตามคำสั่งนั้น.
3. เมธอด __init__
def __init__(self): super().__init__(‘drive_node’) |
- เมธอดนี้เป็นตัวกำหนดค่าพื้นฐานของ DriveNode.
- super().__init__(‘drive_node’): เรียก Node โดยตั้งชื่อว่า ‘drive_node’.
4. สร้าง Subscriber
self.subscription = self.create_subscription( String, ‘/drive_commands’, self.listener_callback, 10 ) |
สร้าง subscriber เพื่อรับข้อความจาก topic ชื่อ /drive_commands.
- String: ชนิดของข้อความที่รับ (ข้อความแบบ string).
- /drive_commands: ชื่อของ topic ที่จะรับข้อมูล.
- self.listener_callback: ฟังก์ชันที่จะถูกเรียกเมื่อมีข้อความใหม่.
- 10: ขนาดของคิว (queue size) สำหรับเก็บข้อความที่รอการประมวลผล.
self.get_logger().info(‘DriveNode is ready to receive commands.’) |
- แสดงข้อความ log เพื่อแจ้งว่าพร้อมรับคำสั่งแล้ว.
5. ฟังก์ชัน listener_callback
def listener_callback(self, msg): command = msg.data self.get_logger().info(f’Received command: “{command}”‘) |
- ฟังก์ชันนี้จะถูกเรียกเมื่อได้รับข้อความจาก topic /drive_commands.
- msg.data: ดึงข้อมูลข้อความจาก message.
if command == ‘Forward’: self.forward() elif command == ‘TurnLeft’: self.turn_left() elif command == ‘TurnRight’: self.turn_right() elif command == ‘Backward’: self.backward() elif command == ‘Stop’: self.stop() else: self.get_logger().warn(f’Unknown command: “{command}”‘) |
ตรวจสอบค่าของคำสั่งที่ได้รับ (command):
- Forward: เรียกฟังก์ชัน self.forward().
- TurnLeft: เรียกฟังก์ชัน self.turn_left().
- TurnRight: เรียกฟังก์ชัน self.turn_right().
- Backward: เรียกฟังก์ชัน self.backward().
- Stop: เรียกฟังก์ชัน self.stop().
- คำสั่งที่ไม่รู้จัก (Unknown command) จะเตือนผ่าน log.
6. ฟังก์ชันสำหรับการเคลื่อนที่
def forward(self): self.get_logger().info(‘Executing forward operation.’) def turn_left(self): self.get_logger().info(‘Executing turn left operation.’) def turn_right(self): self.get_logger().info(‘Executing turn right operation.’) def backward(self): self.get_logger().info(‘Executing backward operation.’) def stop(self): self.get_logger().info(‘Executing stop operation.’) |
ฟัก์ชันเหล่านี้ใช้แสดง log สำหรับการดำเนินการตามคำสั่ง:
- forward(): เดินหน้า.
- turn_left(): เลี้ยวซ้าย.
- turn_right(): เลี้ยวขวา.
- backward(): ถอยหลัง.
- stop(): หยุด.
7. ฟังก์ชัน main
def main(args=None): rclpy.init(args=args) drive_node = DriveNode() try: rclpy.spin(drive_node) except KeyboardInterrupt: pass finally: drive_node.destroy_node() rclpy.shutdown() |
- main: จุดเริ่มต้นของโปรแกรม.
- rclpy.init(args=args): เริ่มต้นระบบ ROS2.
- drive_node = DriveNode(): สร้าง DriveNode.
- rclpy.spin(drive_node): รัน DriveNode เพื่อรอรับคำสั่ง.
- KeyboardInterrupt: หยุดโปรแกรมด้วยการกด Ctrl+C.
- drive_node.destroy_node(): ทำลาย node เพื่อปล่อยทรัพยากร.
จากรายละเอียดของ code ทั้งสอง เราสามารถปรับปรุงและเพิ่มส่วนควบคุมกับทำงาน การรับส่งข้อมูลระหว่าง node รวมถึงเพิ่ม node อื่นๆที่ต้องการได้จากตัวอย่างข้างต้น