Как правильно разобрать файл gpx?Python

Программы на Python
Ответить
Anonymous
 Как правильно разобрать файл gpx?

Сообщение Anonymous »

Как вам мой код?Как вам мой код?Как вам мой код?Как вам мой код?Как вам мой код?Как вам мой код?Как вам мой код?
Импорт
import gpxpy
import geopandas as gpd
import contextily as ctx
from geopy.distance import geodesic
from PIL import Image, ImageEnhance
from shapely.geometry import box, LineString

import requests
import pandas as pd
import os, time
import matplotlib.pyplot as plt
import seaborn as sns
from random import randint, uniform

Загружает файлы GPX с предоставленных URL-адресов и сохраняет их локально.
def download_gpx(links):
for num, url in enumerate(links):
try:
response = requests.get(url)
filename = f"track{num}.gpx"

with open(f"data/gpx/{filename}", mode="wb") as f:
f.write(response.content)
except Exception as e:
print("Error")

df = pd.DataFrame(columns=["track_id", "track_time", "latitude", "longitude", "altitude"])

Преобразует файлы GPX в изображения PNG с визуализацией карт и треков.
def gpx_to_png(df):
margin = 0.02
img_path = "data/image"
os.makedirs(img_path, exist_ok=True)

for i in os.listdir("data/gpx"):
png_name = f"{i[:-4]}.png"
with open(f"data/gpx/{i}", encoding="UTF-8") as f:
gpx = gpxpy.parse(f)
lats, lons = [], []
for track in gpx.tracks:
for segment in track.segments:
for point in segment.points:
lats.append(point.latitude)
lons.append(point.longitude)
df.loc[len(df)] = [i, point.time, point.latitude, point.longitude, point.elevation]

bbox = box(
min(lons) - margin,
min(lats) - margin,
max(lons) + margin,
max(lats) + margin
)

track_line = LineString(zip(lons, lats))

gdf_bbox = gpd.GeoDataFrame(geometry=[bbox], crs="EPSG:4326")
gdf_bbox_web = gdf_bbox.to_crs(epsg=3857)

gdf_track = gpd.GeoDataFrame(geometry=[track_line], crs="EPSG:4326")
gdf_track_web = gdf_track.to_crs(epsg=3857)

fig, ax = plt.subplots(figsize=(10, 8))
gdf_bbox_web.plot(ax=ax, alpha=0)
gdf_track_web.plot(ax=ax, color="red", linewidth=2)

ctx.add_basemap(ax, crs=gdf_bbox_web.crs, source=ctx.providers.OpenStreetMap.Mapnik)

ax.set_axis_off()
plt.savefig(f"{img_path}/{png_name}", dpi=150, bbox_inches="tight", pad_inches=0)
plt.close(fig)

return df

Получает данные о температуре из API Open-Meteo для определенного местоположения и даты.
df['track_time'] = df['track_time'].dt.strftime('%Y-%m-%d')

def temp(lat, lon, date):
url = "https://archive-api.open-meteo.com/v1/archive"

params = {
'latitude': lat,
'longitude': lon,
'start_date': date,
'end_date': date,
'hourly': 'temperature_2m',
"timezone":"auto"
}
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0'}

response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
if response.status_code == 200:
data = response.json()
return data["hourly"]['temperature_2m'][12]
else:
print(f"Error")

Интерполирует температуру между ключевыми точками для всех точек трека.
def analysis_weather(df):
n = len(df)
key_indexes = [0, n//4, n//2, 3*n//4, n-1]
temperatures_at_key_points = {}

for idx in key_indexes:
lat = df.iloc[idx]["latitude"]
lon = df.iloc[idx]["longitude"]
date = df.iloc[idx]["track_time"]

temp_value = temp(lat, lon, date)

temperatures_at_key_points[idx] = temp_value

if len(temperatures_at_key_points) < 2:
print("Not enough data for interpolation, please check the API functionality")
df["temperature"] = None
return df

all_temperatures = []
left_idx = 0
right_idx = 1

for i in range(n):
if (right_idx < len(key_indexes) - 1 and i >= key_indexes[right_idx]):
left_idx += 1
right_idx += 1

left_key = key_indexes[left_idx]
right_key = key_indexes[right_idx]

left_temp = temperatures_at_key_points[left_key]
right_temp = temperatures_at_key_points[right_key]

if i in temperatures_at_key_points:
temperature = temperatures_at_key_points
elif left_temp is not None and right_temp is not None:
temperature = left_temp + (right_temp - left_temp) * (i - left_key) / (right_key - left_key)
else:
temperature = left_temp if left_temp is not None else right_temp

all_temperatures.append(temperature)

df = df.copy()
df["temperature"] = all_temperatures

return df

Применяет анализ погоды к нескольким трекам и объединяет результаты.
def get_temp(df):
try:
df_temp = pd.DataFrame()
for i in range(0, 3):
track_data = df[df["track_id"] == f"track{i}.gpx"]
track_data_weather = analysis_weather(track_data)
df_temp = pd.concat([df_temp, track_data_weather])
print(f"track{i} add")
df = df_temp.copy()
return df
except Exception as e:
print(f"Error {e}")

Извлекает информацию о географическом регионе с помощью OpenStreetMap Nominatim API.
def extract_map_region(lat: float, lon: float):
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/536.36 (KHTML, like Gecko) Chrome/58.0.3029.100 Safari/536.3'}
response = requests.get(f"https://nominatim.openstreetmap.org/rev ... ormat=json", headers=headers)
json = response.json()
time.sleep(1.5)
if "county" in json["address"]:
return json["address"]["county"]
if "state" in json["address"]:
return json["address"]["state"]
return json["address"]["country"]
except Exception as e:
print(f"Error {e}")

Определяет географический регион для точек отслеживания.
Вычисляет частоту шагов на основе расстояния между последовательными точками отслеживания. Предполагается, что средняя длина шага составляет 0,75 метра, чтобы преобразовать расстояние в шаги.
def analysis_region(df):
try:
lat = df.iloc[0]["latitude"]
lon = df.iloc[0]["longitude"]
df = df.copy()
df["region"] = extract_map_region(lat, lon)
return df
except Exception as e:
print(f"Error {e}")

def step_frequency(df):
points = list(zip(df["latitude"], df["longitude"]))
step = [0]

for p1, p2 in zip(points, points[1:]):
dist = geodesic(p1, p2).meters
step.append(dist / 0.75)
df["steps"] = step
return df

Анализирует тип местности и ключевые географические объекты вокруг точек трека.
def terrain_type(df):
overpass_endpoints = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://overpass.openstreetmap.ru/cgi/interpreter"
]
try:
points = list(zip(df["latitude"], df["longitude"]))
rep_points = [points[0], points[len(points)//2], points[-1]] # 3 точки

all_landuse, all_natural, all_key_objects = [], [], set()

for lat, lon in rep_points:
overpass_query = f"""
[out:json][timeout:45];
(
way(around:500,{lat},{lon})["landuse"];
way(around:500,{lat},{lon})["natural"];
way(around:500,{lat},{lon})["leisure"];
way(around:500,{lat},{lon})["waterway"="river"];
way(around:500,{lat},{lon})["waterway"="stream"];
way(around:500,{lat},{lon})["natural"="water"];
node(around:500,{lat},{lon})["place"="city"];
node(around:500,{lat},{lon})["place"="town"];
node(around:500,{lat},{lon})["place"="village"];
node(around:500,{lat},{lon})["natural"="peak"];
node(around:500,{lat},{lon})["natural"="mountain"];
);
out tags center;
"""

for endpoint in overpass_endpoints:
response = requests.get(endpoint, params={'data': overpass_query}, timeout=60)
if response.status_code == 200:
data = response.json()
break

for element in data.get('elements', []):
tags = element.get('tags', {})

if 'landuse' in tags:
all_landuse.append(tags['landuse'])
elif 'natural' in tags:
all_natural.append(tags['natural'])
if tags['natural'] in ['peak', 'mountain'] and 'name' in tags:
all_key_objects.add(f"Mountain: {tags['name']}")

if 'waterway' in tags and tags['waterway'] in ['river', 'stream'] and 'name' in tags:
all_key_objects.add(f"River: {tags['name']}")

if 'place' in tags and tags['place'] in ['city', 'town', 'village'] and 'name' in tags:
all_key_objects.add(f"Settlement: {tags['name']} ({tags['place']})")

if element.get('type') == 'way' and 'natural' in tags and tags['natural'] == 'water' and 'name' in tags:
all_key_objects.add(f"Lake: {tags['name']}")

time.sleep(1.5)

terrain_type = "unknown"
if all_landuse:
terrain_type = max(set(all_landuse), key=all_landuse.count)
elif all_natural:
terrain_type = max(set(all_natural), key=all_natural.count)

key_objects_str = "; ".join(sorted(all_key_objects)) if all_key_objects else None

df["terrain_type"] = terrain_type
df["key_objects_str"] = key_objects_str
except Exception as e:
print(f"Error {e}")
return df

Создает графики оценки плотности ядра для всех числовых столбцов в DataFrame.
def norm_or_not(df):
n = len(df.columns) // 5 + bool(len(df.columns) % 5)
_, axes = plt.subplots(nrows=n, ncols=5, figsize=(15,20))

for idx, i in enumerate(df.columns):
sns.kdeplot(data=df, x=i, common_norm=False, ax=axes[idx // 5][idx%5])

plt.title(i)
plt.show()

Выполняет пополнение данных на изображениях карт для создания вариантов набора обучающих данных.
def data_augmentation():
images_path = "data/image"
os.makedirs(images_path, exist_ok=True)

for filename in os.listdir(images_path):
if filename.lower().endswith(".png") and not any(word in filename for word in ["rotated", "contrasted", "brightness"]):
img_path = os.path.join(images_path, filename)
img = Image.open(img_path)

base_name = os.path.splitext(filename)[0]

rotated_img = img.rotate(randint(10, 60))
rotated_img.save(os.path.join(images_path, f"{base_name}_rotated.png"))

contrasted_img = ImageEnhance.Contrast(img).enhance(randint(2, 4))
contrasted_img.save(os.path.join(images_path, f"{base_name}_contrasted.png"))

brightness_img = ImageEnhance.Brightness(img).enhance(uniform(1.2, 1.6))
brightness_img.save(os.path.join(images_path, f"{base_name}_brightness.png"))


Подробнее здесь: https://stackoverflow.com/questions/798 ... a-gpx-file
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»