24 KiB
Setting up the database¶
Source data takes a lot of space, and might overwhelm the amount of RAM available on the computer. For this reason, we will use a database to work with the source data, so we can offload a bunch of the data from RAM to disk. The included compose.yaml file contains docker instructions to set up a PostgreSQL database with PostGIS. Run it first using
sudo docker compose up -d
The next cell creates the tables used in the database. This requires the psycopg2 library.
import psycopg2
def initialize_database():
connection = psycopg2.connect(database='database', host='localhost', user='database', password='database', port=5432)
with connection:
with connection.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS nodes(
id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
position GEOMETRY(PointZ) NOT NULL
);
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS edges(
id INTEGER GENERATED ALWAYS AS IDENTITY,
start_point_id INTEGER NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
end_point_id INTEGER NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
motorway BOOLEAN NOT NULL,
speed_limit INTEGER NOT NULL,
tunnel BOOLEAN NOT NULL,
passable_same_direction BOOLEAN NOT NULL,
passable_opposite_direction BOOLEAN NOT NULL,
CONSTRAINT no_loops CHECK (start_point_id IS DISTINCT FROM end_point_id)
);
CREATE UNIQUE INDEX IF NOT EXISTS no_repeats ON edges (LEAST(start_point_id, end_point_id), GREATEST(start_point_id, end_point_id));
''')
Constants¶
# Combine nodes closer than this distance from each other into the same node (in metres)
MINIMUM_NODE_DISTANCE = 1
# If we don't have a speed limit value in our data, use this as default
DEFAULT_SPEED_LIMIT = 0
# Support reading other data than just UTM33 for Norway
SOURCE_DATA_EPSG_CODE = "EPSG:32633"
Reading source data¶
Next, we need to read source data. The Norwegian Elveg 2.0 data is available in both SOSI and GML formats. I downloaded it in GML, since SOSI kind of seems old-fashioned. If you want to read other data types, write another conversion function below. Note that it is assumed that the units of the X, Y and Z coordinates are meters, so if necessary, convert the data into something like UML.
from dataclasses import dataclass
from typing import Optional
import xml.etree.ElementTree as ET
import re
@dataclass
class Node:
node_id: str
position_x: float
position_y: float
position_z: Optional[float]
@dataclass
class Edge:
start_point_id: str
end_point_id: str
speed_limit: int
motorway: bool
tunnel: bool
passable_same_direction: bool
passable_opposite_direction: bool
@dataclass
class LoadedFile:
nodes: list[Node]
edges: list[Edge]
x_extent: tuple[float, float]
y_extent: tuple[float, float]
def read_gml_file(file_location: str, print_progress: bool = False) -> LoadedFile:
nodes: list[Node] = []
edges: list[Edge] = []
x_extent = (1e99, -1e99)
y_extent = (1e99, -1e99)
geons = 'http://skjema.geonorge.no/SOSI/produktspesifikasjon/Elveg/2.0'
gmlns = 'http://www.opengis.net/gml/3.2'
tree = ET.parse(file_location)
root = tree.getroot()
motorways: set[str] = set()
speed_limits: dict[str, int] = {}
for motorway in root.findall(f'.//{{{geons}}}Motorveg'):
local_id = motorway.find(f'{{{geons}}}lineærPosisjon//{{{geons}}}lokalId').text
motorways.add(local_id)
for speed_limit in root.findall(f'.//{{{geons}}}Fartsgrense'):
speed_limit_value = int(speed_limit.find(f'.//{{{geons}}}fartsgrenseVerdi').text)
local_id = speed_limit.find(f'{{{geons}}}lineærPosisjon//{{{geons}}}lokalId').text
speed_limits[local_id] = speed_limit_value
road_links = root.findall(f'.//{{{geons}}}Veglenke')
total_road_links = len(road_links)
for road_link_no, road_link in enumerate(road_links):
if print_progress and (road_link_no % 10000 == 0 or road_link_no+1 == total_road_links):
print(f'\rReading road link {road_link_no+1} of {total_road_links}'.ljust(50, ' '), end='')
road_type = road_link.find(f'.//{{{geons}}}typeVeg').text
# If the road type is stairs, car ferry or passenger ferry, skip it
if road_type == 'trapp' or road_type == 'bilferje' or road_type == 'passasjerferje':
continue
local_id = road_link.find(f'.//{{{geons}}}lokalId').text
motorway = local_id in motorways
speed_limit = DEFAULT_SPEED_LIMIT if not local_id in speed_limits else speed_limits[local_id]
# Check if our road is under the terrain, under the sea bottom, or under a glacier (i.e. a tunnel)
road_link_medium = road_link.find(f'{{{geons}}}medium')
if road_link_medium is not None:
road_link_medium = road_link_medium.text
tunnel = road_link_medium == 'underTerrenget' or road_link_medium == 'underSjøbunnen' or road_link_medium == 'underIsbre'
# However, if the road is a walking or biking path, we won't categorise it as a tunnel (since what we really want to
# avoid is road tunnels, and walking or biking path tunnels might just be underpasses or similar)
if road_type == 'gangOgSykkelveg' or road_type == 'sykkelveg' or road_type == 'gangveg':
tunnel = False
positions = road_link.find(f'.//{{{gmlns}}}posList')
coordinates = [float(x) for x in positions.text.split(' ')]
# The lanes describe whether this is a one-way road or not
lanes = road_link.find(f'.//{{{geons}}}feltoversikt')
if lanes is None:
passable_same_direction = True
passable_opposite_direction = False
else:
lanes = lanes.text.split('#')
passable_same_direction = False
passable_opposite_direction = False
for lane in lanes:
lane_number = int(re.search('^([0-9]+).*$', lane).group(1))
if lane_number % 2 == 1:
passable_same_direction = True
else:
passable_opposite_direction = True
reference_direction = road_link.find(f'.//{{{geons}}}referanseretning')
if reference_direction is not None and reference_direction.text == 'mot':
passable_same_direction, passable_opposite_direction = passable_opposite_direction, passable_same_direction
x_coordinates = coordinates[0::3]
y_coordinates = coordinates[1::3]
z_coordinates = coordinates[2::3]
current_node = None
for i in range(len(x_coordinates)):
previous_node = current_node
current_x = x_coordinates[i]
current_y = y_coordinates[i]
current_z = z_coordinates[i]
x_extent = (min(current_x, x_extent[0]), max(current_x, x_extent[1]))
y_extent = (min(current_y, y_extent[0]), max(current_y, y_extent[1]))
node_id = str(road_link_no).rjust(15, ' ') + str(i).rjust(15, ' ')
current_node = Node(node_id, current_x, current_y, current_z)
nodes.append(current_node)
if previous_node is not None:
edge = Edge(previous_node.node_id, current_node.node_id, speed_limit, motorway, tunnel, passable_same_direction, passable_opposite_direction)
edges.append(edge)
return LoadedFile(nodes, edges, x_extent, y_extent)
Add loaded file to database¶
Since the Norwegian GML data is something like 14 GB all together, we won't try to load it all into memory. Instead, we will write the files, one by one, into a database, and later we will read from the database to create a file. At least in the GML data, some nodes don't have good altitudes, so it is possible to provide a function for correcting strange altitudes. This will fire if an altitude is either None or less than -3000.
import psycopg2
import math
from collections import defaultdict
from typing import Optional, Callable
def add_to_database(loadedFile: LoadedFile, correct_altitude_function: Optional[Callable[[float, float], float]] = None, print_progress: bool = False):
connection = psycopg2.connect(database='database', host='localhost', user='database', password='database', port=5432)
with connection:
with connection.cursor() as cursor:
# Store previously defined nodes in a dict, using their geographical position as key
# The keys are a tuple containing the point's X and Y coordinates, rounded to the closest MINIMUM_NODE_DISTANCE
# The values are a lists containing up to several points, where each point is represented as a tuple containing
# the point's database ID, and its actual X, Y and Z coordinates
defined_points: dict[tuple[int, int], list[tuple[int, float, float, float]]] = defaultdict(list)
# Load all points that are already in the database that overlap with the extents of the current file, so we can combine previously added nodes
min_x = loadedFile.x_extent[0] - 2 * MINIMUM_NODE_DISTANCE
max_x = loadedFile.x_extent[1] + 2 * MINIMUM_NODE_DISTANCE
min_y = loadedFile.y_extent[0] - 2 * MINIMUM_NODE_DISTANCE
max_y = loadedFile.y_extent[1] + 2 * MINIMUM_NODE_DISTANCE
polygon_wkt = f'POLYGON(({min_x} {min_y}, {min_x} {max_y}, {max_x} {max_y}, {max_x} {min_y}, {min_x} {min_y}))'
cursor.execute(f'SELECT id, ST_X(position), ST_Y(position), ST_Z(position) FROM nodes WHERE ST_Within(position, ST_GeomFromText(\'{polygon_wkt}\'))')
for node_id, x, y, z in cursor:
rounded_x = int(round(x / MINIMUM_NODE_DISTANCE))
rounded_y = int(round(y / MINIMUM_NODE_DISTANCE))
defined_points[(rounded_x, rounded_y)].append((node_id, x, y, z))
node_id_to_database_id: dict[str, int] = {}
# Add all nodes to database
number_of_nodes = len(loadedFile.nodes)
for node_number, node in enumerate(loadedFile.nodes):
if print_progress and (node_number % 10000 == 0 or node_number+1 == number_of_nodes):
print(f'\rStoring node {node_number+1} of {number_of_nodes}'.ljust(50, ' '), end='')
# Correct node altitude if necessary and possible
if correct_altitude_function is not None and (node.position_z is None or node.position_z < -3000):
node.position_z = correct_altitude_function(node.position_x, node.position_y)
# If all else fails, just say the node is at sea level
if node.position_z is None:
node.position_z = 0
rounded_x = int(round(node.position_x / MINIMUM_NODE_DISTANCE))
rounded_y = int(round(node.position_y / MINIMUM_NODE_DISTANCE))
# Check if a node within minimum distance already exists
closest_node_id = None
closest_node_distance = 1e99
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
for other_node_id, other_node_x, other_node_y, other_node_z in defined_points[(rounded_x + dx, rounded_y + dy)]:
distance = math.sqrt((other_node_x - node.position_x)**2 + (other_node_y - node.position_y)**2 + (other_node_z - node.position_z)**2)
if distance < closest_node_distance:
closest_node_distance = distance
closest_node_id = other_node_id
# If a node within the closest distance already exists, store this
if closest_node_distance < MINIMUM_NODE_DISTANCE:
node_id_to_database_id[node.node_id] = closest_node_id
else:
cursor.execute(f'INSERT INTO nodes(position) VALUES(ST_GeomFromText(\'POINT({node.position_x} {node.position_y} {node.position_z})\')) RETURNING ID;')
node_db_id = cursor.fetchone()[0]
node_id_to_database_id[node.node_id] = node_db_id
defined_points[(rounded_x, rounded_y)].append((node_db_id, node.position_x, node.position_y, node.position_z))
# Next, add all the edges
# Store which edges we have already added, to avoid violating constraint
existing_edges = set()
number_of_edges = len(loadedFile.edges)
for edge_number, edge in enumerate(loadedFile.edges):
if print_progress and (edge_number % 10000 == 0 or edge_number+1 == number_of_edges):
print(f'\rStoring edge {edge_number+1} of {number_of_edges}'.ljust(50, ' '), end='')
start_node_id = node_id_to_database_id[edge.start_point_id]
end_node_id = node_id_to_database_id[edge.end_point_id]
if start_node_id == end_node_id:
continue
added_edge = (min(start_node_id, end_node_id), max(start_node_id, end_node_id))
if added_edge in existing_edges:
continue
existing_edges.add(added_edge)
cursor.execute(f'''
INSERT INTO edges(
start_point_id,
end_point_id,
motorway,
speed_limit,
tunnel,
passable_same_direction,
passable_opposite_direction
)
VALUES(
{start_node_id},
{end_node_id},
{edge.motorway},
{edge.speed_limit},
{edge.tunnel},
{edge.passable_same_direction},
{edge.passable_opposite_direction}
);
''')
Write data file from database¶
Try to pack the file as tightly as possible. Since we represent the road network as a mathematical graph, we need to store the edges and the nodes of the graph. We first store the nodes, and then the edges.
First, a 4-byte integer says how many nodes are stored in the file. Then, each node consists of a 4-byte float for its x position, a 4-byte float for its y position, and a 2-byte integer for its height. Heights are represented as integers by adding 3000, rounding to the closest 10 centimetres, and then multiplying by 10. In this way, heights between around -3000 and +3500 metres can be stored. This needs some modification if you want to do this in a country with higher mountains than Norway. No ID is stored for the nodes; its position in the list is used as ID.
Edges are similarly stored, using first a 4-byte integer that says how many edges are contained. Each edge then contains a 4-byte integer containing the ID of the node it starts from, and a 4-byte integer containing the ID of the node it leads to. Finally, a single byte contains the rest of the information about the edge. The first 4 bits of this byte contains the edge's speed limit, divided by 10. The next bit is a flag saying whether the node is passable when going against its defined direction (i.e. from the end node to the start node), and the bit after that says whether the edge is passable when going in the same direction as the edge. This is meant to represent whether the edge is a one-way street. The next bit is a flag for whether the edge is a tunnel, and then final bit is a flag saying whether the edge is a motorway.
Even though it would make more sense to write the EPSG code of the coordinates at the beginning of the file, we'll write them at the end of the file for backwards compatibility. If this string is not present, the web site will assume the data is in EPSG:32633.
import struct
import psycopg2
def write_data_file(file_name: str, print_progress: bool = False):
with open(file_name, 'wb') as roads_file:
connection = psycopg2.connect(database='database', host='localhost', user='database', password='database', port=5432)
with connection:
with connection.cursor() as cursor:
cursor.execute('SELECT COUNT(*) FROM nodes;')
(number_of_points,) = cursor.fetchone()
roads_file.write(number_of_points.to_bytes(4, 'big'))
cursor.execute('SELECT id, ST_X(position), ST_Y(position), ST_Z(position) FROM nodes ORDER BY id ASC')
id_to_position_map = {}
counter = 0
for node_id, x, y, z in cursor:
id_to_position_map[node_id] = counter
counter += 1
if print_progress and (counter % 10000 == 0 or counter == number_of_points):
print(f'\rWriting node {counter} of {number_of_points} to file'.ljust(50, ' '), end='')
roads_file.write(struct.pack('>f', x))
roads_file.write(struct.pack('>f', y))
if z > 3500 or z < -3000:
z = 0
height_int = int(round(z + 3000) * 10)
roads_file.write(height_int.to_bytes(2, 'big'))
cursor.execute('SELECT COUNT(*) FROM edges;')
(number_of_edges,) = cursor.fetchone()
roads_file.write(number_of_edges.to_bytes(4, 'big'))
cursor.execute('''
SELECT
start_point_id,
end_point_id,
motorway,
speed_limit,
tunnel,
passable_same_direction,
passable_opposite_direction
FROM edges ORDER BY id ASC;'''
)
counter = 0
for start_point_id, end_point_id, motorway, speed_limit, tunnel, passable_same_direction, passable_opposite_direction in cursor:
roads_file.write(id_to_position_map[start_point_id].to_bytes(4, 'big'))
roads_file.write(id_to_position_map[end_point_id].to_bytes(4, 'big'))
counter += 1
if print_progress and (counter % 10000 == 0 or counter == number_of_edges):
print(f'\rWriting edge {counter} of {number_of_edges} to file'.ljust(50, ' '), end='')
flags_integer = int(speed_limit / 10) << 4
if motorway:
flags_integer += 1
if tunnel:
flags_integer += 2
if passable_same_direction:
flags_integer += 4
if passable_opposite_direction:
flags_integer += 8
roads_file.write(flags_integer.to_bytes(1, 'big'))
# Write EPSG code
epsg_code_bytes = SOURCE_DATA_EPSG_CODE.encode('utf-8')
roads_file.write(len(epsg_code_bytes).to_bytes(4, 'big'))
roads_file.write(epsg_code_bytes)
Correct strange altitudes¶
For the Norwegian GML data, we will query an elevation API for strange heights, and just assume that the road at that point is at terrain level.
import requests
import json
def get_actual_elevation(x, y):
parameters = {
'koordsys': '32633',
'punkter': f'[[{x},{y}]]',
'geojson': False
}
return json.loads(requests.get('https://ws.geonorge.no/hoydedata/v1/punkt', params=parameters).text)["punkter"][0]["z"]
Build data file from GML¶
In this case, all the GML files are stored in the folder named data. Read all files in this folder, store them to database, and finally write the database to file.
import os
initialize_database()
gml_files = os.listdir('./data')
for file_number, gml_file in enumerate(gml_files):
print(f'\rReading {gml_file}, number {file_number+1} of {len(gml_files)}'.ljust(50, ' '))
data_file = read_gml_file(f'./data/{gml_file}', print_progress=True)
add_to_database(data_file, get_actual_elevation, print_progress=True)
write_data_file('roads.dat', print_progress=True)