Непоследовательный размер данных API при разделении 4-летнего набора данных на различные моменты времени (Thingsboord)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Непоследовательный размер данных API при разделении 4-летнего набора данных на различные моменты времени (Thingsboord)

Сообщение Anonymous »

Я работаю с 4-летним набором данных, который я разделял на меньшие интервалы времени (кусочки), чтобы отправить запросы API на Tiksboard. Я экспериментировал с различными размерами куски - 3 часа, 6 часов, 12 часов и 24 часа. Поскольку общий период времени остается прежним (4 года), я ожидал, что общий объем полученных данных, полученных в соответствии с различными размерами кусок. Тем не менее, я вижу противоречивые общие размеры данных в зависимости от используемого размера чанка. Похоже, что несоответствие происходит от API Tiksboard, которое возвращает различные суммы данных для каждого чанка.import json
import httpx
from urllib.parse import urlencode
import polars as pl
import asyncio
import os
import re
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from urllib.parse import urljoin
from itertools import chain
from config import BASE_URL, DATAFRAME_AFTER_ANALYSE
import time

numberofkeys = 0
OUTPUT_FILE = "/root/main-kafka-main/ALLDATASETS/DATAFROMMETAMORPHOSIS/new_2.jsonl"
RAW_RESPONSE_DIR = "raw_responses" # Directory to save raw API responses
error_file = []

def create_polling(deviceid, timestamp, slaveadd, template, modbusmap):

return my temple comes here I deleted it because it is long

# Function to fetch or load cached telemetry keys

async def get_cached_keys(client, header, entityID):
cache_dir = "keys_cache"
os.makedirs(cache_dir, exist_ok=True)
cache_file = os.path.join(cache_dir, f"{entityID}.json")

if os.path.exists(cache_file):
with open(cache_file, "r") as f:
keys = json.load(f)
else:
keys = await get_keys(client, header, entityID)
with open(cache_file, "w") as f:
json.dump(keys, f)
return keys

@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def get_keys(client, header, entityID):
all_keys_url = urljoin(BASE_URL, f"/api/plugins/telemetry/DEVICE/{entityID}/keys/timeseries")
response = await client.get(all_keys_url, headers=header)
response.raise_for_status()
data = response.json()
pattern = re.compile(r"^[0-9A-Fa-f]{4}(_H|_I)?$")
filtered_keys = [key for key in data if key and len(key) 1 else "")
except ValueError:
slaveadd = parts[1] if len(parts) > 1 else ""
data_by_ts = {}
for key, entries in data.items():
for entry in entries:
ts = entry["ts"]
data_by_ts.setdefault(ts, {})[key] = entry["value"]
pattern_key = re.compile(r"^([0-9A-Fa-f]{4})(?:_H|_I)$")
for ts in timestamps:
modbusmap = []
ts_data = data_by_ts.get(ts, {})
for key in filtered_keys:
try:
if key not in ts_data:
continue
if len(key) == 6:
new_key = key[:4].lower()
if key.endswith("H"):
modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
else:
modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
else:
new_key = key.lower()
modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
except Exception as e:
print(f"Error processing key '{key}' for timestamp {ts} in chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
error_file.append(e)
continue
polling = create_polling(deviceid, ts, slaveadd, entityType, modbusmap)
yield polling
except Exception as e:
print(f"Error processing chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")

continue
except Exception as r:
print(f"Error processing device {entityID}: {r}")

async def process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):

async for polling in get_device_data(semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):
await queue.put(polling)

async def writer(queue, file_path):

with open(file_path, "w") as f:
while True:
item = await queue.get()
if item is None:
break
json_str = json.dumps(item)
f.write(json_str + "\n")
queue.task_done()

async def get_all_devices_from_file(file_name, token, useStrictDataTypes, startTs, endTs):

start_time = time.time()
file_path = f"{DATAFRAME_AFTER_ANALYSE}{file_name}.csv"
df_devices = pl.read_csv(file_path, columns=["id", "name", "type"])
header = {"Authorization": f"Bearer {token}"}

async with httpx.AsyncClient(http2=True) as client:
semaphore = asyncio.Semaphore(50)
queue = asyncio.Queue()
writer_task = asyncio.create_task(writer(queue, OUTPUT_FILE))

tasks = []
for entityID, entityName, entityType in df_devices.select(["id", "name", "type"]).iter_rows():
task = asyncio.create_task(
process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType)
)
tasks.append(task)

await asyncio.gather(*tasks)
await queue.put(None)
await writer_task

end_time = time.time()
execution_time = end_time - start_time
return f"Execution time: {execution_time:.6f} seconds"
< /code>
Мои вопросы: < /p>
  • Кто -нибудь испытывал подобное поведение с вещами или любым другим API при разделении данных по интервалу времени? Сценарии?


Подробнее здесь: https://stackoverflow.com/questions/795 ... s-time-chu
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»