import httpx
from urllib.parse import urlencode
import polars as pl
import asyncio
import os
import re
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from urllib.parse import urljoin
from itertools import chain
from config import BASE_URL, DATAFRAME_AFTER_ANALYSE
import time
numberofkeys = 0
OUTPUT_FILE = "/root/main-kafka-main/ALLDATASETS/DATAFROMMETAMORPHOSIS/new_2.jsonl"
RAW_RESPONSE_DIR = "raw_responses" # Directory to save raw API responses
error_file = []
def create_polling(deviceid, timestamp, slaveadd, template, modbusmap):
return my temple comes here I deleted it because it is long
# Function to fetch or load cached telemetry keys
async def get_cached_keys(client, header, entityID):
cache_dir = "keys_cache"
os.makedirs(cache_dir, exist_ok=True)
cache_file = os.path.join(cache_dir, f"{entityID}.json")
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
keys = json.load(f)
else:
keys = await get_keys(client, header, entityID)
with open(cache_file, "w") as f:
json.dump(keys, f)
return keys
@retry(
retry=retry_if_exception_type(httpx.HTTPStatusError),
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def get_keys(client, header, entityID):
all_keys_url = urljoin(BASE_URL, f"/api/plugins/telemetry/DEVICE/{entityID}/keys/timeseries")
response = await client.get(all_keys_url, headers=header)
response.raise_for_status()
data = response.json()
pattern = re.compile(r"^[0-9A-Fa-f]{4}(_H|_I)?$")
filtered_keys = [key for key in data if key and len(key) 1 else "")
except ValueError:
slaveadd = parts[1] if len(parts) > 1 else ""
data_by_ts = {}
for key, entries in data.items():
for entry in entries:
ts = entry["ts"]
data_by_ts.setdefault(ts, {})[key] = entry["value"]
pattern_key = re.compile(r"^([0-9A-Fa-f]{4})(?:_H|_I)$")
for ts in timestamps:
modbusmap = []
ts_data = data_by_ts.get(ts, {})
for key in filtered_keys:
try:
if key not in ts_data:
continue
if len(key) == 6:
new_key = key[:4].lower()
if key.endswith("H"):
modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
else:
modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
else:
new_key = key.lower()
modbusmap.append({"addr": f"0x{new_key}", "type": 0, "val": ts_data[key]})
modbusmap.append({"addr": f"0x{new_key}", "type": 1, "val": ts_data[key]})
except Exception as e:
print(f"Error processing key '{key}' for timestamp {ts} in chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
error_file.append(e)
continue
polling = create_polling(deviceid, ts, slaveadd, entityType, modbusmap)
yield polling
except Exception as e:
print(f"Error processing chunk {chunk_start}-{chunk_end} for device {entityID}: {e}")
continue
except Exception as r:
print(f"Error processing device {entityID}: {r}")
async def process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):
async for polling in get_device_data(semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType):
await queue.put(polling)
async def writer(queue, file_path):
with open(file_path, "w") as f:
while True:
item = await queue.get()
if item is None:
break
json_str = json.dumps(item)
f.write(json_str + "\n")
queue.task_done()
async def get_all_devices_from_file(file_name, token, useStrictDataTypes, startTs, endTs):
start_time = time.time()
file_path = f"{DATAFRAME_AFTER_ANALYSE}{file_name}.csv"
df_devices = pl.read_csv(file_path, columns=["id", "name", "type"])
header = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(http2=True) as client:
semaphore = asyncio.Semaphore(50)
queue = asyncio.Queue()
writer_task = asyncio.create_task(writer(queue, OUTPUT_FILE))
tasks = []
for entityID, entityName, entityType in df_devices.select(["id", "name", "type"]).iter_rows():
task = asyncio.create_task(
process_device(queue, semaphore, client, header, entityID, useStrictDataTypes, startTs, endTs, entityName, entityType)
)
tasks.append(task)
await asyncio.gather(*tasks)
await queue.put(None)
await writer_task
end_time = time.time()
execution_time = end_time - start_time
return f"Execution time: {execution_time:.6f} seconds"
< /code>
Мои вопросы: < /p>
- Кто -нибудь испытывал подобное поведение с вещами или любым другим API при разделении данных по интервалу времени? Сценарии?
Подробнее здесь: https://stackoverflow.com/questions/795 ... s-time-chu