Код: Выделить всё
class RequestAPI(beam.DoFn):
def setup(self):
self._client = language_v1.LanguageServiceClient()
def process(self, element):
classify = False
if element['lang'] =='en':
classify = True
document = {"document": {
"type_": "PLAIN_TEXT", "content": element['text']},
"features": {
"extract_entities": True,
"extract_document_sentiment": True,
"extract_entity_sentiment": True,
"classify_text": classify,
},
"encoding_type": "UTF8"
}
response = self._client.annotate_text(request=document)
new_element = element
new_element['analysis'] = {'sentences': response.sentences,
'score': response.document_sentiment.score,
'magnitude': response.document_sentiment.magnitude,
'tokens': response.tokens,
'lang_api': response.language,
'entities': response.entities
}
print(new_element)
return new_element
Код: Выделить всё
data_enrich: PCollection = (
tweets_data | beam.ParDo(RequestAPI())
| 'write data' >> beam.io.WriteToText('data.txt')
)
Я уже проверяю, что ptoblem не является ответом API или типом нового элемента (это dict).
Я использую Python 3.7 и Apache Beam версии 2.43.
Я надеюсь, что вы сможете мне помочь или дать мне какое-либо представление о что происходит.
Я пробовал это раньше с преобразованием Map, и оно выдавало ту же ошибку, и я видел несколько примеров внешнего вызова с лучом Apache, и в примерах используется DoFn классы, поэтому я попробовал, но это не сработало. Это дает мне ту же ошибку.
Подробнее здесь: https://stackoverflow.com/questions/757 ... l-api-call