Notice: Function _load_textdomain_just_in_time was called incorrectly. Translation loading for the wp-statistics domain was triggered too early. This is usually an indicator for some code in the plugin or theme running too early. Translations should be loaded at the init action or later. Please see Debugging in WordPress for more information. (This message was added in version 6.7.0.) in /www/wwwroot/robotattractor/wp-includes/functions.php on line 6114

Notice: Function _load_textdomain_just_in_time was called incorrectly. Translation loading for the jetpack domain was triggered too early. This is usually an indicator for some code in the plugin or theme running too early. Translations should be loaded at the init action or later. Please see Debugging in WordPress for more information. (This message was added in version 6.7.0.) in /www/wwwroot/robotattractor/wp-includes/functions.php on line 6114

Notice: Function _load_textdomain_just_in_time was called incorrectly. Translation loading for the updraftplus domain was triggered too early. This is usually an indicator for some code in the plugin or theme running too early. Translations should be loaded at the init action or later. Please see Debugging in WordPress for more information. (This message was added in version 6.7.0.) in /www/wwwroot/robotattractor/wp-includes/functions.php on line 6114
百度Apollo中的A星算法实现笔记 – 机器人家园

  本文主要讲百度Apollo无人车项目中A*算法的C++实现。
  Apollo中有三个地方使用了A*算法,如下。至于为啥不把两个A*算法写成一个统一而且通用的库呢,可能是不方便吧,因为里面有很多细节处理不一样。
  1. 在路由规划模块 routing 中:modules\routing\strategy\a_star_strategy.cc
  2. 在开放场地泊车轨迹规划模块 open_space 中:modules\planning\open_space\coarse_trajectory_generator\hybrid_a_star.cc
  3. 泊车轨迹规划前计算启发代价值:modules\planning\open_space\coarse_trajectory_generator\grid_search.cc

1 A*算法伪代码

  在维基百科上有A*算法的伪代码,比较干净,如下。

    openSet := {start}  //初始化:只把起点放进去
    while openSet is not empty        ①
        current := the node in openSet having the lowest fScore[] value  ②
        if current = goal 
           return reconstruct_path(cameFrom, current)    ③
        openSet.Remove(current)       ④
        for each neighbor of current  ⑤
            tentative_gScore := gScore[current] + d(current, neighbor)
            if tentative_gScore < gScore[neighbor]
                cameFrom[neighbor] := current
                gScore[neighbor] := tentative_gScore
                fScore[neighbor] := gScore[neighbor] + h(neighbor)
                if neighbor not in openSet
                    openSet.add(neighbor)

  大致的流程解释如下:
  ① 判断open是否空 每次一上来都是先检查open列表是不是空。如果是空的后面的就不执行了。你可以认为A*算法的运行过程就是围绕着open列表的操作,先取节点,再删除节点,再扩展节点,再添加节点;
  ② 从open中取出代价最小的节点 如果open列表不是空的,从open列表中取出代价最小的节点,注意是f值最小的,不是g值。一般用优先级队列实现open列表,节点进入优先级队列是要按照代价值大小排序的,优先级大的排在栈顶第一个,所以取最小值的取第一个就行了。优先级可以自己定义,这里定义成代价越小优先级越高就行了;
  ③ 判断是否为目标 每从open列表中取出一个节点(一般管这个节点叫当前节点current),都判断一下它是不是目标节点,如果是那就说明搜索到终点了,根据A星算法的完备性,到终点就意味着找到最短路径了,后面的语句就不再执行了。
  ④ 从open中删除最小代价的节点 如果当前节点不是目标节点,那就检查它的邻居们。但是要先从open列表中删除当前节点。为啥要先删除呢?这就涉及算法的理论基础了,比较麻烦。大概意思是,节点的代价都是从小的传导到大的,如果每次取代价最小的节点,后面的节点代价总是大于当前节点(边的代价都是大于零的),不会存在一条更小的路径再回来通过当前节点,所以删除是一次性的,永远不用再添加回来;如果open列表有两个,就每个都要删除一次。如果用了closed列表,每次从open列表中删除当前节点后总会把它加入到closed列表中,表示后面不再拜访它了,一个节点不可能既在open列表里又在closed列表里。
  ⑤ 检查邻居并计算代价 遍历当前节点的所有邻居节点(遍历的顺序无所谓),并且计算邻居节点的代价。邻居节点可能在open列表里,也可能不在里面。如果在里面,而且这个刚计算出来的代价比已经有的代价小,就用更小的替换了,同时更新他的父节点,也就是说此时找到一条到达这个节点更短的路径了。如果不在里面,就加入到open列表里;上面的伪代码没使用close列表,但Apollo程序中都用了close列表,这样效率高一点(不走回头路),如果邻居节点在close列表中,则直接跳过不处理。
  ⑥ 回到①,循环;

  A*算法的核心概念就是这个open列表了(或者叫集合,都无所谓,因为我们现在只关心它里面有啥,不关心里面元素是按什么顺序排列的),里面存储着已经被发现(但还有待进一步扩展的)节点。伪代码里的open列表——openSet出现了5次,分别是在干什么呢,如下。后面的括号表示Apollo里用哪种数据结构实现的。可见查找都是用普通的容器,取最小元素则用优先队列实现。
  1 判断openSet是不是空(用优先队列判断)
  2 从openSet中取代价最小的元素(用优先队列获取)
  3 从openSet中把代价最小的元素删除
  4 判断代价最小的元素的邻居是不是在openSet里面(用普通的容器实现)
  5 把邻居插入到openSet里面

  算法的逻辑不是很复杂,但是实现起来有很多小细节,非常琐碎,一个个看吧。

2 路由规划模块中的A*算法

  先看路由规划模块routing,算法实现在a_star_strategy.cc中的Search函数里,这个程序跟维基百科伪代码的逻辑是一样的,就连变量的命名也一样。

while (!open_set_detail.empty()) {
    current_node = open_set_detail.top();
    const auto* from_node = current_node.topo_node;
    if (current_node.topo_node == dest_node) {
      if (!Reconstruct(came_from_, from_node, result_nodes)) {
        AERROR << "Failed to reconstruct route.";
        return false;
      }
      return true;
    }
    open_set_.erase(from_node);
    open_set_detail.pop();
    if (closed_set_.count(from_node) != 0) {   continue;  }// if showed before, just skip...
    closed_set_.emplace(from_node);
    // if residual_s is less than FLAGS_min_length_for_lane_change, only move forward
    const auto& neighbor_edges = (GetResidualS(from_node) > FLAGS_min_length_for_lane_change &&  change_lane_enabled_) ? from_node->OutToAllEdge() : from_node->OutToSucEdge();
    double tentative_g_score = 0.0;
    next_edge_set.clear();
    for (const auto* edge : neighbor_edges) {
      sub_edge_set.clear();
      sub_graph->GetSubInEdgesIntoSubGraph(edge, &sub_edge_set);
      next_edge_set.insert(sub_edge_set.begin(), sub_edge_set.end());
    }
    for (const auto* edge : next_edge_set) {
      const auto* to_node = edge->ToNode();
      if (closed_set_.count(to_node) == 1)                                {       continue;      }
      if (GetResidualS(edge, to_node) < FLAGS_min_length_for_lane_change) {       continue;      }
      tentative_g_score = g_score_[current_node.topo_node] + GetCostToNeighbor(edge);
      if (edge->Type() != TopoEdgeType::TET_FORWARD) {     tentative_g_score -= (edge->FromNode()->Cost() + edge->ToNode()->Cost()) / 2;     }
      double f = tentative_g_score + HeuristicCost(to_node, dest_node);
      if (open_set_.count(to_node) != 0 && f >= g_score_[to_node])        {        continue;     }
      g_score_[to_node] = f;
      SearchNode next_node(to_node);
      next_node.f = f;
      open_set_detail.push(next_node);
      came_from_[to_node] = from_node;
      if (open_set_.count(to_node) == 0) {open_set_.insert(to_node);}
    }
  }

  首先用一种叫“优先队列”的数据结构std::priority_queue实现了一个open列表:open_set_detailpriority_queue默认是大顶堆,即值最大的元素排在最顶部。 这里改写了,因为节点的值代表代价,我们希望代价最小的排在最前面。

std::priority_queue open_set_detail;

  然后在头文件里又有一个open列表(和closed列表),但使用的却是unordered_set无序集合容器。

std::unordered_set open_set_;
std::unordered_set closed_set_;

  为什么要使用两个open列表呢?
  在伪代码中只使用了一个open列表。但是在编程实现时采用了两个:open_set_detail常用于取代价最小(优先级最高)的节点,因为对于优先队列这种数据结构,元素是按照优先级排列的,优先级最高的元素就被放在第一个的位置,所以取它当然就非常快了,是O(1)时间复杂度,但是std::priority_queue不支持快速查询里面是不是有某个元素。而open_set_主要用来查询,即查询里面有没有某个节点。unordered_set的优点是能够直接使用元素索引值快速查询(用hash实现的)。从功能上看其实用一个open列表就可以实现,用两个是为了提高效率。从元素的角度来说,open_set_detail列表存储的是SearchNode类型的元素,SearchNode类型是一种复合的元素,主要包括TopoNode节点和该节点的f函数值,用于基于值的存取;open_set_列表中存储的只是TopoNode元素,因此它只用于查询。
  A*搜索结束后,如何抽取出路径来呢?这是用Reconstruct函数实现的,它从目标节点开始往回倒推得到一系列的节点组成的路径。回溯用了came_from_这个容器,它存储着每个节点的父节点。每个父节点有不只一个子节点,而每个子节点只有一个父节点,所以这就是一颗树。对于一颗树来说,知道了子节点,沿着的它的父节点、爷爷节点……往回找总能到树根(就是起始节点)。
  路由规划使用的启发函数是曼哈顿距离,而不是欧式距离。这在城市网格地图中比较合理,深究原因涉及到背后的度量的概念。Dijkstra和A*都只能搜索离散图上的最短距离路径,不能搜索实际的真正欧式最短距离路径。

  double distance = fabs(src_point.x() - dest_point.x()) + fabs(src_point.y() - dest_point.y());

  有些地方似乎有问题:
  刚pop出来的当前节点就判断是否在closed_set_中似乎没有必要。
  代价函数g与标准算法不一样,为什么要让g_score_[to_node] = f呢?这相当于加了两次h启发函数,好像不对。而且怎么让f与g值去比较呢(f >= g_score_[to_node]),这有什么意义?在较早的Apollo1.5版中还是g_score_[to_node] = tentative_g_score;,后面的版本就改了,不懂。

3 开放场地轨迹规划中的A*算法

  再看开放场地轨迹规划模块open_space中的实现,在Plan函数中,也定义了一个std::priority_queue类型的open_pq_优先队列。

std::priority_queue, std::vector>, cmp> open_pq_;

  然后又有一个open列表(以及closed列表,还写成了close,估计这两个程序不是一个人写的):

std::unordered_map> open_set_;
std::unordered_map> close_set_;

  注意这里是unordered_map,而前面的路由规划模块是unordered_setopen_set_列表存储的是Node3d和相应的index。
  hybrid_a_star.cc 中回溯路径的函数是GetResult,没单独定义came_from_,而是直接在每个节点中存储了父节点的指针。
  hybrid_a_star.cc比标准的A*算法多了个AnalyticExpansion(current_node)检测,也就是检测当前节点是不是能直接用Reeds-Shepp曲线连接目标并且没有与障碍物碰撞,如果可以就不搜索了,这是一种偷懒的技巧。其它的部分基本一样,不用解释就能看懂。

while (!open_pq_.empty()) {
    const std::string current_id = open_pq_.top().first;// take out the lowest cost neighboring node
    open_pq_.pop();
    std::shared_ptr current_node = open_set_[current_id];
    // check if an analystic curve could be connected from current configuration to the end configuration without collision. if so, search ends.
    if (AnalyticExpansion(current_node))                              {   break;     }
    close_set_.insert(std::make_pair(current_node->GetIndex(), current_node));
    for (size_t i = 0; i < next_node_num_; ++i) {
      std::shared_ptr next_node = Next_node_generator(current_node, i);
      if (next_node == nullptr)                                       {   continue;   }// boundary check failure handle
      if (close_set_.find(next_node->GetIndex()) != close_set_.end()) {   continue;   }// check if the node is already in the close set
      if (!ValidityCheck(next_node))                                  {   continue;   }// 碰撞检测
      if (open_set_.find(next_node->GetIndex()) == open_set_.end()) {
        CalculateNodeCost(current_node, next_node);
        open_set_.emplace(next_node->GetIndex(), next_node);
        open_pq_.emplace(next_node->GetIndex(), next_node->GetCost());
      }
    }
  }

  Apollo在open_space轨迹规划模块中采用的是OBCA算法,但是代码中似乎有一些错误,例如遗漏了一个地方(在OBCA原程序hybrid_a_star.jl中),每次pop出栈顶元素后没有把open列表中的值删除掉:delete!(open, c_id)。对邻居节点只判断了一种情况,即邻居节点不在open列表里就添加进去,但是如果已经在open列表里了怎么办呢,程序没有考虑这种情况。当然 open_space模块还在测试中,有问题也正常。
  OBCA原程序hybrid_a_star.jl中回溯路径用的方式借助closed,每个节点也存储了父节点(的索引),再根据索引从closed里找节点。

  看这些代码非常累,因为同一个算法不同程序员实现的方式真是五花八门。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注