การพัฒนาหุ่นยนต์เคลื่อนที่ด้วย ROS2 ตอนที่ 1.1

รายละเอียดของ code ในไฟล์ command_node.py และ drive_node.py

อธิบายโค้ดโดยละเอียด Command_node.py

1. การนำเข้าไลบรารีที่ใช้

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from pynput import keyboard
import threading
  • rclpy: ไลบรารีสำหรับพัฒนา ROS2 nodes ในภาษา Python
  • rclpy.node.Node: คลาสพื้นฐานสำหรับสร้าง node ใน ROS2
  • std_msgs.msg.String: ชนิดของข้อความที่ใช้ใน ROS2 (ข้อความแบบ string)
  • pynput.keyboard: ไลบรารีสำหรับตรวจจับการกดแป้นพิมพ์
  • threading: ไลบรารีสำหรับจัดการเธรด (ใช้ในกรณีต้องการทำงานหลายอย่างพร้อมกัน)

2. การสร้างคลาส CommandNode

class CommandNode(Node):
    def __init__(self):
        super().__init__(‘command_node’)
  • สร้าง node ชื่อ command_node โดยเรียกใช้คอนสตรัคเตอร์ของคลาส Node ผ่าน super().

3. สร้าง Publisher

self.publisher_ = self.create_publisher(String, ‘/drive_commands’, 10)
  • สร้าง publisher สำหรับส่งข้อความประเภท String ไปยัง topic ชื่อ /drive_commands.
  • 10: คือขนาดของ queue (จำนวนข้อความที่เก็บไว้ในกรณีที่การส่งข้อมูลล่าช้า).

4. แสดงข้อความใน log

self.get_logger().info(‘CommandNode is running. Press keys to send commands.’)
  • แสดงข้อความใน log เพื่อบอกว่าตัว node ทำงานเรียบร้อยแล้ว.

5. ตั้งค่าการฟังแป้นพิมพ์

self.get_logger().info(‘CommandNode is ruself.listener = keyboard.Listener(on_press=self.on_press)
self.listener.start()
  • ใช้ keyboard.Listener เพื่อตรวจจับการกดแป้นพิมพ์ โดยเรียกใช้ฟังก์ชัน on_press ทุกครั้งที่กดแป้น.

6. จัดการการกดแป้นพิมพ์

def on_press(self, key):
    try:
        if key.char == ‘i’:
            self.publish_command(‘Forward’)
        elif key.char == ‘j’:
            self.publish_command(‘TurnLeft’)
        elif key.char == ‘l’:
            self.publish_command(‘TurnRight’)
        elif key.char == ‘m’:
            self.publish_command(‘Backward’)
        elif key.char == ‘k’:
            self.publish_command(‘Stop’)
    except AttributeError:
        pass
  • ฟังก์ชันนี้จะถูกเรียกเมื่อมีการกดแป้นพิมพ์.
  • ตรวจสอบว่าแป้นที่กดเป็น i, j, l, m หรือ k และเรียก publish_command พร้อมส่งคำสั่งที่เกี่ยวข้อง.
  • AttributeError: จัดการกรณีที่กดปุ่มพิเศษ เช่น Shift หรือ Ctrl.

7. ส่งคำสั่งไปยัง topic

def publish_command(self, command):
        msg = String()
        msg.data = command
        self.publisher_.publish(msg)
        self.get_logger().info(f’Publishing: “{msg.data}”‘)
  • สร้างข้อความประเภท String.
  • กำหนดค่า msg.data เป็นคำสั่งที่ต้องการส่ง เช่น ‘Forward’, ‘Stop’.
  • ส่งข้อความผ่าน publisher ไปยัง topic /drive_commands.
  • แสดงข้อความใน log เพื่อบอกว่ากำลังส่งคำสั่ง.

8. ฟังก์ชันหลัก

def main(args=None):
    rclpy.init(args=args)
    node = CommandNode()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()
  • rclpy.init(): เริ่มต้นระบบ ROS2.
  • สร้าง CommandNode.
  • rclpy.spin(node): รัน node จนกว่าจะกดหยุดด้วย Ctrl+C.
  • เมื่อหยุดการทำงาน:
  • ทำลาย node ด้วย destroy_node().
  • ปิดระบบ ROS2 ด้วย rclpy.shutdown().

9. จุดเริ่มต้นของโปรแกรม

if __name__ == ‘__main__’:
    main()
  • หากรันไฟล์นี้โดยตรง จะเรียกใช้ฟังก์ชัน main.

รายละเอียดโค้ด drive_node.py

1. การนำเข้าไลบรารีที่ใช้

rt rclpy
from rclpy.node import Node
from std_msgs.msg import String
  • rclpy: ไลบรารีหลักของ ROS2 ที่ใช้สำหรับสร้าง node และจัดการระบบสื่อสาร.
  • Node: คลาสพื้นฐานที่ใช้ในการสร้าง node ใน ROS2.
  • String: ชนิดของข้อความที่ใช้สำหรับส่งข้อมูลผ่าน topic (ในที่นี้คือข้อความแบบ string).

2. การสร้างคลาส DriveNode

class DriveNode(Node):
  • สร้างคลาส DriveNode ซึ่งสืบทอดคุณสมบัติจากคลาส Node.
  • ทำหน้าที่เป็น node สำหรับรับคำสั่งและดำเนินการตามคำสั่งนั้น.

3. เมธอด __init__

def __init__(self):
    super().__init__(‘drive_node’)
  • เมธอดนี้เป็นตัวกำหนดค่าพื้นฐานของ DriveNode.
  • super().__init__(‘drive_node’): เรียก Node โดยตั้งชื่อว่า ‘drive_node’.

4. สร้าง Subscriber

self.subscription = self.create_subscription(
    String,
    ‘/drive_commands’,
    self.listener_callback,
    10
)

สร้าง subscriber เพื่อรับข้อความจาก topic ชื่อ /drive_commands.

  • String: ชนิดของข้อความที่รับ (ข้อความแบบ string).
  • /drive_commands: ชื่อของ topic ที่จะรับข้อมูล.
  • self.listener_callback: ฟังก์ชันที่จะถูกเรียกเมื่อมีข้อความใหม่.
  • 10: ขนาดของคิว (queue size) สำหรับเก็บข้อความที่รอการประมวลผล.
self.get_logger().info(‘DriveNode is ready to receive commands.’)
  • แสดงข้อความ log เพื่อแจ้งว่าพร้อมรับคำสั่งแล้ว.

5. ฟังก์ชัน listener_callback

def listener_callback(self, msg):
    command = msg.data
    self.get_logger().info(f’Received command: “{command}”‘)
  • ฟังก์ชันนี้จะถูกเรียกเมื่อได้รับข้อความจาก topic /drive_commands.
  • msg.data: ดึงข้อมูลข้อความจาก message.
if command == ‘Forward’:
    self.forward()
elif command == ‘TurnLeft’:
    self.turn_left()
elif command == ‘TurnRight’:
    self.turn_right()
elif command == ‘Backward’:
    self.backward()
elif command == ‘Stop’:
    self.stop()
else:
    self.get_logger().warn(f’Unknown command: “{command}”‘)

ตรวจสอบค่าของคำสั่งที่ได้รับ (command):

  • Forward: เรียกฟังก์ชัน self.forward().
  • TurnLeft: เรียกฟังก์ชัน self.turn_left().
  • TurnRight: เรียกฟังก์ชัน self.turn_right().
  • Backward: เรียกฟังก์ชัน self.backward().
  • Stop: เรียกฟังก์ชัน self.stop().
  • คำสั่งที่ไม่รู้จัก (Unknown command) จะเตือนผ่าน log.

6. ฟังก์ชันสำหรับการเคลื่อนที่

def forward(self):
    self.get_logger().info(‘Executing forward operation.’)
 
def turn_left(self):
    self.get_logger().info(‘Executing turn left operation.’)
 
def turn_right(self):
    self.get_logger().info(‘Executing turn right operation.’)
 
def backward(self):
    self.get_logger().info(‘Executing backward operation.’)
 
def stop(self):
    self.get_logger().info(‘Executing stop operation.’)

ฟัก์ชันเหล่านี้ใช้แสดง log สำหรับการดำเนินการตามคำสั่ง:

  • forward(): เดินหน้า.
  • turn_left(): เลี้ยวซ้าย.
  • turn_right(): เลี้ยวขวา.
  • backward(): ถอยหลัง.
  • stop(): หยุด.

7. ฟังก์ชัน main

def main(args=None):
    rclpy.init(args=args)
    drive_node = DriveNode()
    try:
        rclpy.spin(drive_node)
    except KeyboardInterrupt:
        pass
    finally:
        drive_node.destroy_node()
        rclpy.shutdown()
  • main: จุดเริ่มต้นของโปรแกรม.
  • rclpy.init(args=args): เริ่มต้นระบบ ROS2.
  • drive_node = DriveNode(): สร้าง DriveNode.
  • rclpy.spin(drive_node): รัน DriveNode เพื่อรอรับคำสั่ง.
  • KeyboardInterrupt: หยุดโปรแกรมด้วยการกด Ctrl+C.
  • drive_node.destroy_node(): ทำลาย node เพื่อปล่อยทรัพยากร.

จากรายละเอียดของ code ทั้งสอง เราสามารถปรับปรุงและเพิ่มส่วนควบคุมกับทำงาน การรับส่งข้อมูลระหว่าง node รวมถึงเพิ่ม node อื่นๆที่ต้องการได้จากตัวอย่างข้างต้น

This entry was posted in Blog 101. Bookmark the permalink.

Leave a Reply

Your email address will not be published. Required fields are marked *