Код: Выделить всё
python
Copy code
import apache_beam as beam
p1 = beam.Pipeline()
attendance_count = (
p1
| beam.io.ReadFromText("dept_data.txt", validate=True)
| beam.Map(lambda x: x.split(","))
| beam.Filter(lambda x: x[3] == "Accounts")
| beam.Map(lambda x: (x[1], 1))
| beam.CombinePerKey(sum)
| beam.Map(lambda x: f"{x[0]},{x[1]}")
| beam.io.WriteToCsv("output/dept_op_data.csv", num_shards=1)
)
p1.run()
Код: Выделить всё
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
Full Traceback:
Код: Выделить всё
Traceback (most recent call last):
File "/path/to/your/script.py", line 7, in
p1
...
File "/opt/anaconda3/envs/beam/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 610, in schema_from_element_type
raise TypeError(
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
- Проверил validate=True в ReadFromText.
- Вернулся к использованию Map и JointPerKey.
- Изучил преобразования Beam с учетом схемы, но не смог понять, как интегрировать их в свой конвейер.
Как решить эту проблему? Нужно ли мне учитывать схему конвейера или есть более простое исправление этой ошибки? Будем очень признательны за любые рекомендации.
Подробнее здесь: https://stackoverflow.com/questions/793 ... e-hint-any
Мобильная версия