本文主要讲百度Apollo无人车项目中A*算法的C++实现。
Apollo中有三个地方使用了A*算法,如下。至于为啥不把两个A*算法写成一个统一而且通用的库呢,可能是不方便吧,因为里面有很多细节处理不一样。
1. 在路由规划模块 routing 中:modules\routing\strategy\a_star_strategy.cc
2. 在开放场地泊车轨迹规划模块 open_space 中:modules\planning\open_space\coarse_trajectory_generator\hybrid_a_star.cc
3. 泊车轨迹规划前计算启发代价值:modules\planning\open_space\coarse_trajectory_generator\grid_search.cc
1 A*算法伪代码
在维基百科上有A*算法的伪代码,比较干净,如下。
openSet := {start} //初始化:只把起点放进去
while openSet is not empty ①
current := the node in openSet having the lowest fScore[] value ②
if current = goal
return reconstruct_path(cameFrom, current) ③
openSet.Remove(current) ④
for each neighbor of current ⑤
tentative_gScore := gScore[current] + d(current, neighbor)
if tentative_gScore < gScore[neighbor]
cameFrom[neighbor] := current
gScore[neighbor] := tentative_gScore
fScore[neighbor] := gScore[neighbor] + h(neighbor)
if neighbor not in openSet
openSet.add(neighbor)
大致的流程解释如下:
① 判断open是否空 每次一上来都是先检查open列表是不是空。如果是空的后面的就不执行了。你可以认为A*算法的运行过程就是围绕着open列表的操作,先取节点,再删除节点,再扩展节点,再添加节点;
② 从open中取出代价最小的节点 如果open列表不是空的,从open列表中取出代价最小的节点,注意是f值最小的,不是g值。一般用优先级队列实现open列表,节点进入优先级队列是要按照代价值大小排序的,优先级大的排在栈顶第一个,所以取最小值的取第一个就行了。优先级可以自己定义,这里定义成代价越小优先级越高就行了;
③ 判断是否为目标 每从open列表中取出一个节点(一般管这个节点叫当前节点current
),都判断一下它是不是目标节点,如果是那就说明搜索到终点了,根据A星算法的完备性,到终点就意味着找到最短路径了,后面的语句就不再执行了。
④ 从open中删除最小代价的节点 如果当前节点不是目标节点,那就检查它的邻居们。但是要先从open列表中删除当前节点。为啥要先删除呢?这就涉及算法的理论基础了,比较麻烦。大概意思是,节点的代价都是从小的传导到大的,如果每次取代价最小的节点,后面的节点代价总是大于当前节点(边的代价都是大于零的),不会存在一条更小的路径再回来通过当前节点,所以删除是一次性的,永远不用再添加回来;如果open列表有两个,就每个都要删除一次。如果用了closed列表,每次从open列表中删除当前节点后总会把它加入到closed列表中,表示后面不再拜访它了,一个节点不可能既在open列表里又在closed列表里。
⑤ 检查邻居并计算代价 遍历当前节点的所有邻居节点(遍历的顺序无所谓),并且计算邻居节点的代价。邻居节点可能在open列表里,也可能不在里面。如果在里面,而且这个刚计算出来的代价比已经有的代价小,就用更小的替换了,同时更新他的父节点,也就是说此时找到一条到达这个节点更短的路径了。如果不在里面,就加入到open列表里;上面的伪代码没使用close列表,但Apollo程序中都用了close列表,这样效率高一点(不走回头路),如果邻居节点在close列表中,则直接跳过不处理。
⑥ 回到①,循环;
A*算法的核心概念就是这个open列表了(或者叫集合,都无所谓,因为我们现在只关心它里面有啥,不关心里面元素是按什么顺序排列的),里面存储着已经被发现(但还有待进一步扩展的)节点。伪代码里的open列表——openSet出现了5次,分别是在干什么呢,如下。后面的括号表示Apollo里用哪种数据结构实现的。可见查找都是用普通的容器,取最小元素则用优先队列实现。
1 判断openSet是不是空(用优先队列判断)
2 从openSet中取代价最小的元素(用优先队列获取)
3 从openSet中把代价最小的元素删除
4 判断代价最小的元素的邻居是不是在openSet里面(用普通的容器实现)
5 把邻居插入到openSet里面
算法的逻辑不是很复杂,但是实现起来有很多小细节,非常琐碎,一个个看吧。
2 路由规划模块中的A*算法
先看路由规划模块routing,算法实现在a_star_strategy.cc
中的Search
函数里,这个程序跟维基百科伪代码的逻辑是一样的,就连变量的命名也一样。
while (!open_set_detail.empty()) {
current_node = open_set_detail.top();
const auto* from_node = current_node.topo_node;
if (current_node.topo_node == dest_node) {
if (!Reconstruct(came_from_, from_node, result_nodes)) {
AERROR << "Failed to reconstruct route.";
return false;
}
return true;
}
open_set_.erase(from_node);
open_set_detail.pop();
if (closed_set_.count(from_node) != 0) { continue; }// if showed before, just skip...
closed_set_.emplace(from_node);
// if residual_s is less than FLAGS_min_length_for_lane_change, only move forward
const auto& neighbor_edges = (GetResidualS(from_node) > FLAGS_min_length_for_lane_change && change_lane_enabled_) ? from_node->OutToAllEdge() : from_node->OutToSucEdge();
double tentative_g_score = 0.0;
next_edge_set.clear();
for (const auto* edge : neighbor_edges) {
sub_edge_set.clear();
sub_graph->GetSubInEdgesIntoSubGraph(edge, &sub_edge_set);
next_edge_set.insert(sub_edge_set.begin(), sub_edge_set.end());
}
for (const auto* edge : next_edge_set) {
const auto* to_node = edge->ToNode();
if (closed_set_.count(to_node) == 1) { continue; }
if (GetResidualS(edge, to_node) < FLAGS_min_length_for_lane_change) { continue; }
tentative_g_score = g_score_[current_node.topo_node] + GetCostToNeighbor(edge);
if (edge->Type() != TopoEdgeType::TET_FORWARD) { tentative_g_score -= (edge->FromNode()->Cost() + edge->ToNode()->Cost()) / 2; }
double f = tentative_g_score + HeuristicCost(to_node, dest_node);
if (open_set_.count(to_node) != 0 && f >= g_score_[to_node]) { continue; }
g_score_[to_node] = f;
SearchNode next_node(to_node);
next_node.f = f;
open_set_detail.push(next_node);
came_from_[to_node] = from_node;
if (open_set_.count(to_node) == 0) {open_set_.insert(to_node);}
}
}
首先用一种叫“优先队列”的数据结构std::priority_queue
实现了一个open列表:open_set_detail
。priority_queue
默认是大顶堆,即值最大的元素排在最顶部。 这里改写了,因为节点的值代表代价,我们希望代价最小的排在最前面。
std::priority_queue open_set_detail;
然后在头文件里又有一个open列表(和closed列表),但使用的却是unordered_set
无序集合容器。
std::unordered_set open_set_;
std::unordered_set closed_set_;
为什么要使用两个open列表呢?
在伪代码中只使用了一个open列表。但是在编程实现时采用了两个:open_set_detail
常用于取代价最小(优先级最高)的节点,因为对于优先队列这种数据结构,元素是按照优先级排列的,优先级最高的元素就被放在第一个的位置,所以取它当然就非常快了,是O(1)时间复杂度,但是std::priority_queue
不支持快速查询里面是不是有某个元素。而open_set_
主要用来查询,即查询里面有没有某个节点。unordered_set
的优点是能够直接使用元素索引值快速查询(用hash实现的)。从功能上看其实用一个open列表就可以实现,用两个是为了提高效率。从元素的角度来说,open_set_detail
列表存储的是SearchNode
类型的元素,SearchNode
类型是一种复合的元素,主要包括TopoNode
节点和该节点的f
函数值,用于基于值的存取;open_set_
列表中存储的只是TopoNode
元素,因此它只用于查询。
A*搜索结束后,如何抽取出路径来呢?这是用Reconstruct
函数实现的,它从目标节点开始往回倒推得到一系列的节点组成的路径。回溯用了came_from_
这个容器,它存储着每个节点的父节点。每个父节点有不只一个子节点,而每个子节点只有一个父节点,所以这就是一颗树。对于一颗树来说,知道了子节点,沿着的它的父节点、爷爷节点……往回找总能到树根(就是起始节点)。
路由规划使用的启发函数是曼哈顿距离,而不是欧式距离。这在城市网格地图中比较合理,深究原因涉及到背后的度量的概念。Dijkstra和A*都只能搜索离散图上的最短距离路径,不能搜索实际的真正欧式最短距离路径。
double distance = fabs(src_point.x() - dest_point.x()) + fabs(src_point.y() - dest_point.y());
有些地方似乎有问题:
刚pop出来的当前节点就判断是否在closed_set_
中似乎没有必要。
代价函数g与标准算法不一样,为什么要让g_score_[to_node] = f
呢?这相当于加了两次h启发函数,好像不对。而且怎么让f与g值去比较呢(f >= g_score_[to_node]
),这有什么意义?在较早的Apollo1.5版中还是g_score_[to_node] = tentative_g_score;
,后面的版本就改了,不懂。
3 开放场地轨迹规划中的A*算法
再看开放场地轨迹规划模块open_space中的实现,在Plan
函数中,也定义了一个std::priority_queue
类型的open_pq_
优先队列。
std::priority_queue, std::vector>, cmp> open_pq_;
然后又有一个open列表(以及closed列表,还写成了close,估计这两个程序不是一个人写的):
std::unordered_map> open_set_;
std::unordered_map> close_set_;
注意这里是unordered_map
,而前面的路由规划模块是unordered_set
。open_set_
列表存储的是Node3d
和相应的index。
hybrid_a_star.cc
中回溯路径的函数是GetResult
,没单独定义came_from_
,而是直接在每个节点中存储了父节点的指针。
hybrid_a_star.cc
比标准的A*算法多了个AnalyticExpansion(current_node)
检测,也就是检测当前节点是不是能直接用Reeds-Shepp曲线连接目标并且没有与障碍物碰撞,如果可以就不搜索了,这是一种偷懒的技巧。其它的部分基本一样,不用解释就能看懂。
while (!open_pq_.empty()) {
const std::string current_id = open_pq_.top().first;// take out the lowest cost neighboring node
open_pq_.pop();
std::shared_ptr current_node = open_set_[current_id];
// check if an analystic curve could be connected from current configuration to the end configuration without collision. if so, search ends.
if (AnalyticExpansion(current_node)) { break; }
close_set_.insert(std::make_pair(current_node->GetIndex(), current_node));
for (size_t i = 0; i < next_node_num_; ++i) {
std::shared_ptr next_node = Next_node_generator(current_node, i);
if (next_node == nullptr) { continue; }// boundary check failure handle
if (close_set_.find(next_node->GetIndex()) != close_set_.end()) { continue; }// check if the node is already in the close set
if (!ValidityCheck(next_node)) { continue; }// 碰撞检测
if (open_set_.find(next_node->GetIndex()) == open_set_.end()) {
CalculateNodeCost(current_node, next_node);
open_set_.emplace(next_node->GetIndex(), next_node);
open_pq_.emplace(next_node->GetIndex(), next_node->GetCost());
}
}
}
Apollo在open_space轨迹规划模块中采用的是OBCA算法,但是代码中似乎有一些错误,例如遗漏了一个地方(在OBCA原程序hybrid_a_star.jl中),每次pop出栈顶元素后没有把open列表中的值删除掉:delete!(open, c_id)
。对邻居节点只判断了一种情况,即邻居节点不在open列表里就添加进去,但是如果已经在open列表里了怎么办呢,程序没有考虑这种情况。当然 open_space模块还在测试中,有问题也正常。
OBCA原程序hybrid_a_star.jl中回溯路径用的方式借助closed
,每个节点也存储了父节点(的索引),再根据索引从closed
里找节点。
看这些代码非常累,因为同一个算法不同程序员实现的方式真是五花八门。