NTTドコモR&Dの技術ブログです。

位置情報から移動経路推定してみた

こんにちは。NTTドコモ サービスイノベーション部の大澤です。普段は位置情報をメインとしたデータ活用に関する業務をしています。 この記事ではPythonを用いて、移動した経路を位置情報から推定する方法について紹介したいと思います。

概要

突然ですが、図1のような地図データがあるとします。n0とかn1は交差点を示していると思ってください。 赤いピンのように位置情報が時系列的に取得されたとして、この時どの経路を移動していたかわかりますか?

図1. 地図データと位置情報

なんとなく、「n1→n4→n5→n6→n7」の順で移動したのではと予測できるのではないでしょうか。

図2. 地図データと位置情報から推定される移動経路

この記事ではマップマッチングという技術を使って、上記のように位置情報と地図データから移動経路を推定してみたいと思います。

移動経路を推定することで、

  • 交通量調査: 多くのユーザの位置情報を用いて交通量を測定すれば、交通計画に活用ができます。
  • マーケティング活用: 店舗周辺の道路をよく通る人を特定して、広告配信して集客を狙うといった行動変容にも使うことができます。
  • DOOH広告効果測定: 移動経路から屋外広告設置箇所を通った人を特定して、接触者がその後購買等に繋がったかを分析できれば広告効果の測定ができそうです。道路に限らず電車移動を推定できれば、電車内広告に接触した人もわかります。
  • 工事区間や災害エリアをよく通る人への迂回ルート提案: 工事区間をよく通る人に向けて迂回ルートを提案することで渋滞緩和やドライバーの満足度向上を期待できます。

というように幅広い場面で活用の可能性が見えてきます。

マップマッチングとは?

マップマッチングとは、GPSのような位置情報データと地図データ(道路をエッジ、交差点をノードとしたグラフ構造データ)を用いて移動経路を推定する技術です。ここからは具体的にどのように移動経路を推定するのか説明していきます。

移動経路推定方法

1. データの準備: 地図データのノードとエッジの作成

マップマッチングは位置情報と地図データを用いて移動経路を推定する技術なので、まずはそれぞれのデータを用意する必要があります。 説明簡易化のため、サンプルとして概要の中でお見せした地図データを用いていこうと思います。

1.1 データ定義

import pandas as pd

# 位置情報
df_trajs = pd.DataFrame(
    [
        ["2024-12-01 10:00:00", 35.004, 140.0005],
        ["2024-12-01 11:00:00", 35.002, 140.0015],
        ["2024-12-01 12:00:00", 35.000, 140.0008],
        ["2024-12-01 13:00:00", 35.000, 140.0035],
    ],
    columns=["log_time", "lat", "lng"]
)

# 地図データのノード
df_nodes = pd.DataFrame(
    [
        ["n0", 35.004, 140.000],
        ["n1", 35.004, 140.001],
        ["n2", 35.003, 140.000],
        ["n3", 35.002, 140.000],
        ["n4", 35.002, 140.001],
        ["n5", 35.000, 140.001],
        ["n6", 35.000, 140.002],
        ["n7", 35.000, 140.003],
    ],
    columns=["node_id", "lat", "lng"]
)

# 地図データのノード間の接続情報(道路)
df_edges = pd.DataFrame(
    [
        ["n1", "n0", 1],
        ["n0", "n2", 1],
        ["n2", "n3", 1],
        ["n1", "n4", 2],
        ["n3", "n4", 1],
        ["n4", "n5", 1],
        ["n4", "n7", 2],
        ["n5", "n6", 1],
        ["n6", "n7", 1],
    ],
    columns=["node_id1", "node_id2", "distance"]
)

位置情報やノードの緯度経度はダミーで設定しています。また、ノード間の距離もダミーで適当な値に設定しています。 ここでは地図データを有向グラフとして考えたいと思います。 車道は一方向にしか移動できないため上下線でエッジを分けた有向グラフにしたり、歩道は向きに関係なく移動が可能であるため無向グラフにするなど工夫は考えられます。

1.2 地図データの可視化

どのようなグラフ構造になっているのかを確認するため、作ったグラフをPython上で可視化してみたいと思います。

import matplotlib.pyplot as plt
import networkx as nx

# グラフ作成
di_graph = nx.DiGraph()

# ノード追加
for _, row in df_nodes.iterrows():
    di_graph.add_node(row["node_id"], pos=(row["lng"], row["lat"]))

# エッジ追加
for _, row in df_edges.iterrows():
    di_graph.add_edge(row["node_id1"], row["node_id2"])

# ノード位置を取得
pos = nx.get_node_attributes(di_graph, 'pos')

# グラフ描画
plt.figure(figsize=(15, 15))
nx.draw(
    di_graph, pos, with_labels=True, node_size=1000, node_color="lightblue", font_size=20, font_color="black"
)
plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.grid(True)
plt.show()

実行すると以下のようなグラフが可視化できると思います。概要にある地図データと同じような構造になっていますね。

図3. Python上で地図データの可視化

2. ノードの空間インデックス化

データも揃ったところで移動経路推定に移りたいのですが、その前に下準備をしておきます。 マップマッチングでは位置情報が測位された場所周辺のノードを通ったとして移動経路を推定します。つまり、位置情報から通ったと思われるノードを抽出する必要があります。 どうやって位置情報からその周辺のノードを抽出するかなのですが、例えば位置情報と各ノードの緯度経度から距離計算をして一定範囲内のノードを抽出すれば良さそうです。ただ地図データというのは本来膨大なサイズのグラフデータになっているので、1つ1つ計算をしていると推定にかなり時間がかかってしまいます。 そこで、空間インデックスを作っておくことで高速にノード抽出を行えるようにします。

import numpy as np
from scipy.spatial import cKDTree

def latlng_to_cartesian(lat, lon):
    """
    緯度経度を地球表面の3D座標に変換
    """
    EARTH_RADIUS = 6371000
    lat, lon = np.radians(lat), np.radians(lon)
    x = EARTH_RADIUS * np.cos(lat) * np.cos(lon)
    y = EARTH_RADIUS * np.cos(lat) * np.sin(lon)
    z = EARTH_RADIUS * np.sin(lat)
    return np.array([x, y, z])

node_cartesian = np.array([latlng_to_cartesian(lat, lng) for lat, lng in df_nodes[["lat", "lng"]].values])
tree = cKDTree(node_cartesian)

scipyライブラリのcKDTreeクラスを用いることで、KD木によって地図データのノードを空間インデックス化しています。KD木では木構造を用いて階層的にデータを分割するため、探索範囲を絞って効率的にノード探索することが可能です。 cKDTreeの内部の距離関数にはユークリッド距離が用いられるので、ノードの緯度経度もそれに合わせて3次元のユークリッド空間上の座標に変換しています。

3. マップマッチングによる移動経路推定

いよいよマップマッチングによる移動経路推定です。

3.1 地図データのグラフ化

推定を行う上で地図データをグラフとして保持しておきます。

from collections import defaultdict

map_graph = defaultdict(dict)
for node_id1, node_id2, distance in df_edges.values:
    map_graph[node_id1][node_id2] = distance

3.2 位置情報からノードの抽出

先述の通り、マップマッチングでは位置情報が測位された場所周辺のノードを通ったとして移動経路を推定します。ここでは先ほど作った空間インデックスを用いて、通ったと思われるノードを抽出していきます。

# 位置情報を3D座標に変換
traj_cartesian = [latlng_to_cartesian(lat, lng) for lat, lng in df_trajs[["lat", "lng"]].values]
# 各位置情報の半径70m以内に存在するノードのインデックスを抽出
RADIUS = 70
emitted_node_indices = tree.query_ball_point(traj_cartesian, RADIUS)

ノードの空間インデックスは3次元ユークリッド空間の座標に変換したものを使っていたので、位置情報についても同様に変換した値を使います。 cKDTreeのquery_ball_pointメソッドを使うと、指定座標から指定範囲内(今回は70m)に存在するノードのインデックスを取得できます。

実際に抽出されたノードを見てみると以下のようになっています。

print(emitted_node_indices)
# [list([0, 1]) list([4]) list([5]) list([7])]

指定範囲内のノードということで複数のノードが抽出されることがあります。 位置情報から最も近いノードのみ抽出すれば良いように感じますが、位置情報に測位誤差があって本来いた場所から少し離れた場所で位置情報が測位されたというケースに対応するためこのようにしています。例えば以下のようなケースがあった時、1番目の位置情報がn0かn1かで推定される移動経路が変わってきそうです。 その場合、全く違う経路を移動していたと誤推定する可能性があるため、誤差も考慮してノード抽出を行なっているということですね。

図4. 位置情報から抽出されたノード

3.3 移動経路推定用グラフのエッジ作成

移動経路を推定する際は、各位置情報から抽出されたノードからノードへの移動を考慮して、それぞれのノードをエッジで繋いだ後、尤もらしい移動経路を見つけてあげれば良いです。 例えば、1番目の位置情報からはn0とn1、2番目の位置情報からはn4が通った可能性のあるノードとして抽出されています。つまり、n0→n4とn1→n4の経路を考えてエッジで繋ぎます。 i番目の位置情報から抽出されたノード数をmi、i+1番目の位置情報から抽出されたノード数をmi+1とした場合、エッジ数はmi×mi+1だけできます。ノード抽出の際、誤差を考慮して多くのノードを抽出すれば精度は上がる可能性がありますが、計算時間がO(n2)に比例して大きくなるトレードオフの関係にあるため、ノード抽出範囲をバランスの良い値に設定する必要があります。

図5. 推定用グラフ
エッジのコストについては、例えば交通量が多さ、道路の広さ、法定速度などを基準に通った確率の高い道路のコストを小さくするなど工夫が考えられます。 ただ、今回は地図データ上の距離をそのまま使おうと思います。また、エッジはノード間を寄り道せず最短経路で移動したと考えたいと思います。

import itertools
import heapq

def dijkstra(graph, start, goal):
    """
    ダイクストラ法による最短経路探索
    """
    # 優先度キューを初期化
    queue = [(0, start, [])]  # (累積距離, 現在のノード, 経路)
    visited = set()

    while queue:
        (cost, node, path) = heapq.heappop(queue)

        if node in visited:
            continue

        visited.add(node)
        path = path + [node]

        # ゴールに到達した場合
        if node == goal:
            return cost, path

        # 隣接ノードをキューに追加
        for neighbor, weight in graph.get(node, {}).items():
            if neighbor not in visited:
                heapq.heappush(queue, (cost + weight, neighbor, path))

    return np.inf, []  # ゴールに到達できない場合


# 推定用グラフ
est_graph = defaultdict(dict)
# 中間経路保存用の辞書
shortest_path_dict = defaultdict(dict)

# 位置情報からノードが抽出されなかった場合は削除
emitted_node_indices = [indices for indices in emitted_node_indices if indices]
# 隣り合う位置情報間で抽出されたノードを接続
for indices1, indices2 in zip(emitted_node_indices[:-1], emitted_node_indices[1:]):
    for node_id1, node_id2 in itertools.product(df_nodes.iloc[indices1]["node_id"], df_nodes.iloc[indices2]["node_id"]):
        # 抽出されたノード間を移動した場合、どれくらいの距離移動したか、どのような経路を通っていたか
        min_distance, shortest_path = dijkstra(map_graph, node_id1, node_id2)
        # node_id1からnode_id2に移動した場合の距離を設定
        est_graph[node_id1][node_id2] = min_distance
        # 中間経路を補間できるように保存
        shortest_path_dict[node_id1][node_id2] = shortest_path
        
# スタートとゴールを示すダミーノードを追加(距離は0で設定)
for node_id in df_nodes.iloc[emitted_node_indices[0]]["node_id"]:
    est_graph["start"][node_id] = 0
for node_id in df_nodes.iloc[emitted_node_indices[-1]]["node_id"]:
    est_graph[node_id]["end"] = 0

ここではダイクストラ法を使って最短距離と最短経路を求めています。ノード間の遷移には先述の通り、最短経路の距離を設定しています。est_graphの中にはあくまで位置情報から抽出されたノードしか入っていないので、移動経路を推定した後に地図データ上のノードのつながりと合うようにノードを補間するため最短経路についても保持しています。ノードの補間については後述します。 また、推定用のグラフの最初と最後にはダミーノードとしてstartとendを追加しています。経路探索をする上で開始ノードと終了ノードを指定する必要があるためこれらを設定しています。最終的に出来上がったグラフは以下のようになります。

図6. ダミーノードを追加した後の最終的な推定用グラフ

3.4 移動経路推定

推定用のグラフが完成したら、いよいよ移動経路の推定です。 推定用グラフではstartを始点、endを終点とするグラフができており、それぞれのエッジは位置情報間の中で移動したと思われるノードの遷移を示しています。つまり、startからendまでの最短経路を探索することで、寄り道となるようなノードを選ばず、最も効率的に移動するための経路がわかります。 ここでもダイクストラ法を用いて最短経路を探索しようと思います。

# 最短経路探索
_, estimated_path = dijkstra(est_graph, "start", "end")
# スタート、ゴールのダミーノードは削除
estimated_path = estimated_path[1:-1]
# 位置情報ごとに通過したと思われるノード
print(estimated_path)
# ['n1', 'n4', 'n5', 'n7']

位置情報ごとに通過したと思われるノードが推定されました。 特に1番目の位置情報についてはn0かn1どちらを通っていたかわかりませんでしたが、全体を通しで見た時にはn1を通っていたという推定になったということがわかります。

4. 移動経路の補間

これまでの処理で、それぞれの位置情報のタイミングでどのノードを通っていたかはわかりましたが、まだ完全な経路ではありません。特にn5→n7の移動では間にn6があります。推定用のエッジ作成の際、ノード間の最短経路についても保存していました。これを用いて移動経路の補間をしていきます。 まず、それぞれのノード間の最短経路について見てみましょう。

print(shortest_path_dict)
# {'n0': {'n4': ['n0', 'n2', 'n3', 'n4']}, 'n1': {'n4': ['n1', 'n4']}, 'n4': {'n5': ['n4', 'n5']}, 'n5': {'n7': ['n5', 'n6', 'n7']}}
print(shortest_path_dict["n0"]["n4"])
# ['n0', 'n2', 'n3', 'n4']
print(shortest_path_dict["n5"]["n7"])
# ['n5', 'n6', 'n7']

例えばn0からn4への移動では、「n0→n2→n3→n4」と行くのが最短経路のようです。また、n5からn7へは「n5→n6→n7」が最短経路です。地図データを見てもそうなっていますね。これらを用いて補間処理を書くと以下の通りになります。

interpolated_path = []
for node_id1, node_id2 in zip(estimated_path[:-1], estimated_path[1:]):
    # node_id1→node_id2を移動する最短経路をロード
    interpolated_path += shortest_path_dict[node_id1][node_id2][:-1] # ノードの重複が起こらないように最後だけは削除
# 重複削除の対応として最後のノードだけは追加
interpolated_path.append(estimated_path[-1])
print(interpolated_path)
# ['n1', 'n4', 'n5', 'n6', 'n7']

最終的な経路は「n1→n4→n5→n6→n7」になりました。概要でお見せした推定ができました。

まとめ

この記事では位置情報と地図データから移動経路を推定する方法をお伝えしました。今回の記事では触れませんでしたが、地図データという大きなデータを扱う上での工夫や精度向上方法として以下が考えられます。

  • ノード抽出方法の工夫: 位置情報に近い方が通った確率は高いため、複数ノード抽出した場合でも位置情報からの距離に応じてコストを設定することで更なる精度向上が考えられます。
  • ノード間の最短距離, 最短経路の求め方: ダイクストラ法で計算する以外にも事前に最短経路を保持しておく方式を用いることで計算時間を大幅に削減することができます。
  • 分散処理: 扱うデータサイズが大きいことからマップマッチングによる移動経路推定には時間がかかります。多くのユーザ基盤を保持している場合にも短時間で処理できるよう、分散処理の技術を取り入れることで高速に推定を行うことができます。
  • 地図グラフの最適化: ノードの接続方法やコスト設定などグラフについても最適化することで精度向上につながります。

移動経路を推定することでマーケティングにおける活用や交通量の調査など幅広い活用が考えられることから、位置情報に関する研究の今後の発展に期待しています。 最後までご覧いただきありがとうございました。