handler.py
Код: Выделить всё
import boto3
class RealClass:
def __init__(self):
self.s3_client = boto3.client("s3")
def get_unprocessed_files(self) -> list[str]:
paginator = self.s3_client.get_paginator("list_objects_v2")
operation_parameters = {"Bucket": self.bronze_bucket, "Prefix": self.prefix}
page_iterator = paginator.paginate(**operation_parameters)
un_processed_files = []
for page in page_iterator:
for obj in page.get("Contents", []):
key = obj["Key"]
if key.endswith(".content.txt") or key.endswith(".metadata.json"):
un_processed_files.append(key)
return un_processed_files
Код: Выделить всё
import unittest
from unittest.mock import patch, MagicMock
from handler import RealClass
class TestRealClass(unittest.TestCase):
def setUp(self) -> None:
self.real = RealClass()
@patch("boto3.client")
def test_get_unprocessed_files(self, mock_boto3_client):
response = [
{
"Contents": [
{"Key": "files/1001000284.txt"},
]
}
]
# What to do here?
result = self.pii.get_unprocessed_files()
self.assertIsInstance(result, list)
self.assertTrue(len(result) > 0)
self.assertTrue(result[0].find("1001000284") > -1)
Подробнее здесь: https://stackoverflow.com/questions/789 ... -s3-client
Мобильная версия