Применение - разъем API к подключенному приложению в Salesforce (Sandbox)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Применение - разъем API к подключенному приложению в Salesforce (Sandbox)

Сообщение Anonymous »

hello w Ваш текст ord!
Я использую oauth jwt flow с: < /p>
  • Ключ для потребителей (sf_consumer_key) < /li>
    Consumer Secret (sf_consumer_secret)
  • url экземпляра (sf_instance_url)
ошибки.
test_salesforce.py::test_connection не удастся [20%]
test_salesforce.py::test_fetch_data не удастся [40%]
test_salesforce.py::test_create_record не удалось [60%]
test_salesforce.py: :: test_rate_limiting не удалось [100%]

=============================================================== ========= Провалы ==================================================================== ==
_________________________________________ test_connection __________________________________________ < /p>
self =

async def connect(self) -> bool:
for attempt in range(self.MAX_RETRIES):
try:
from simple_salesforce import Salesforce
import logging

logging.info(f"Attempting Salesforce connection - attempt {attempt + 1}")
username = self.credentials.get('username') or os.getenv('SF_USERNAME')
password = self.credentials.get('password') or os.getenv('SF_PASSWORD')
security_token = self.credentials.get('security_token') or os.getenv('SF_SECURITY_TOKEN')
consumer_key = self.credentials.get('consumer_key') or os.getenv('SF_CONSUMER_KEY')
consumer_secret = self.credentials.get('consumer_secret') or os.getenv('SF_CONSUMER_SECRET')

if not all([username, password, security_token]):
raise SalesforceError("Missing required credentials: username, password, or security token")

> self.client = Salesforce(
username=username,
password=password,
security_token=security_token,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
domain='test' # Specifically for developer/sandbox environments
)

api_connectors.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.pythonlibs/lib/python3.11/site-packages/simple_salesforce/api.py:153: in __init__
self._refresh_session()
.pythonlibs/lib/python3.11/site-packages/simple_salesforce/api.py:307: in _refresh_session
self.session_id, self.sf_instance = self._salesforce_login_partial()
.pythonlibs/lib/python3.11/site-packages/simple_salesforce/login.py:227: in SalesforceLogin
return soap_login(soap_url, login_soap_request_body,
< /code>

soap_url = 'https://test.salesforce.com/services/Soap/u/59.0'
request_body = '\nMan-Top-Ata.123!uC70ArGo6ATwhysrY4ETlukrI\n \n \n'
headers = {'SOAPAction': 'login', 'charset': 'UTF-8', 'content-type': 'text/xml'}
proxies = {}, session =

def soap_login(
soap_url: str,
request_body: str,
headers: Optional[Headers],
proxies: Optional[Proxies],
session: Optional[requests.Session] = None) -> Tuple[str, str]:
"""Process SOAP specific login workflow."""
response = (session or requests).post(
soap_url, request_body, headers=headers, proxies=proxies)

if response.status_code != 200:
except_code: Union[str, int, None]
except_msg: str
try:
except_code = getUniqueElementValueFromXmlString(
response.content, 'sf:exceptionCode')
except_msg = (getUniqueElementValueFromXmlString(
response.content, 'sf:exceptionMessage')
or response.content.decode())
except ExpatError:
except_code = response.status_code
except_msg = response.content.decode()
> raise SalesforceAuthenticationFailed(except_code, except_msg)
E simple_salesforce.exceptions.SalesforceAuthenticationFailed: INVALID_LOGIN: Invalid username, password, security token; or user locked out.

.pythonlibs/lib/python3.11/site-packages/simple_salesforce/login.py:253: SalesforceAuthenticationFailed

The above exception was the direct cause of the following exception:

connector =

@pytest.mark.asyncio
async def test_connection(connector):
> success = await connector.connect()

test_salesforce.py:41:
< /code>

self =

async def connect(self) -> bool:
for attempt in range(self.MAX_RETRIES):
try:
from simple_salesforce import Salesforce
import logging

logging.info(f"Attempting Salesforce connection - attempt {attempt + 1}")
username = self.credentials.get('username') or os.getenv('SF_USERNAME')
password = self.credentials.get('password') or os.getenv('SF_PASSWORD')
security_token = self.credentials.get('security_token') or os.getenv('SF_SECURITY_TOKEN')
consumer_key = self.credentials.get('consumer_key') or os.getenv('SF_CONSUMER_KEY')
consumer_secret = self.credentials.get('consumer_secret') or os.getenv('SF_CONSUMER_SECRET')

if not all([username, password, security_token]):
raise SalesforceError("Missing required credentials: username, password, or security token")

self.client = Salesforce(
username=username,
password=password,
security_token=security_token,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
domain='test' # Specifically for developer/sandbox environments
)
logging.info("Successfully connected to Salesforce")
return True
except Exception as e:
logging.error(f"Salesforce connection error: {str(e)}")
if attempt == self.MAX_RETRIES - 1:
> raise SalesforceError(f"Failed to connect after {self.MAX_RETRIES} attempts") from e
E api_connectors.SalesforceError: Failed to connect after 3 attempts

api_connectors.py:67: SalesforceError
< /code>
---------------------------------------- Захваченный журнал вызов журнала -------------------------------------
rouge root: api_connectors.py: 65 Salesforce Ошибка соединения: Invalid_login: неверное имя пользователя, пароль, токен безопасности; или пользователь заблокирован.
round root: api_connectors.py: 65 Ошибка подключения Salesforce: Invalid_login: неверное имя пользователя, пароль, токен безопасности; или пользователь заблокирован.
round root: api_connectors.py: 65 Ошибка подключения Salesforce: Invalid_login: неверное имя пользователя, пароль, токен безопасности; или пользователь заблокирован. < /p>
========================= ============================================== api_connectors.salesforceError: не удалось подключиться после 3 попыток
Неудача test_salesforce.py::test_fetch_data - api_connectors.salesforceError: не удалось подключиться после 3 попыток
не удалось. Подключиться после 3 попыток
неудачные test_salesforce.py::test_batch_create - api_connectors.salesforceError: не удалось подключиться после 3 попыток
неудачные test_salesforce.py::test_rate_limit /p>
Это мой api_connector.py < /strong>: < /p>
from abc import ABC, abstractmethod
from typing import Dict, Any, List
from models import Department
import asyncio
import time
import logging
import os
from datetime import datetime

class BaseAPIConnector(ABC):
def __init__(self, credentials: Dict[str, str], department: Department):
self.credentials = credentials
self.department = department

@abstractmethod
async def connect(self) -> bool:
pass

@abstractmethod
async def fetch_data(self, query: str) -> Dict[str, Any]:
pass

class SalesforceError(Exception):
"""Base exception for Salesforce operations"""
pass

class SalesforceConnector(BaseAPIConnector):
MAX_RETRIES = 3
BATCH_SIZE = 200
API_LIMIT_PER_SECOND = 20

def __init__(self, credentials: Dict[str, str], department: Department):
super().__init__(credentials, department)
self._last_request_time = 0
self._request_count = 0

async def connect(self) -> bool:
for attempt in range(self.MAX_RETRIES):
try:
from simple_salesforce import Salesforce
import logging

logging.info(f"Attempting Salesforce connection - attempt {attempt + 1}")
username = self.credentials.get('username') or os.getenv('SF_USERNAME')
password = self.credentials.get('password') or os.getenv('SF_PASSWORD')
security_token = self.credentials.get('security_token') or os.getenv('SF_SECURITY_TOKEN')
consumer_key = self.credentials.get('consumer_key') or os.getenv('SF_CONSUMER_KEY')
consumer_secret = self.credentials.get('consumer_secret') or os.getenv('SF_CONSUMER_SECRET')

if not all([username, password, security_token]):
raise SalesforceError("Missing required credentials: username, password, or security token")

self.client = Salesforce(
username=username,
password=password,
security_token=security_token,
consumer_key=consumer_key,
consumer_secret=consumer_secret,
domain='test' # Specifically for developer/sandbox environments
)
logging.info("Successfully connected to Salesforce")
return True
except Exception as e:
logging.error(f"Salesforce connection error: {str(e)}")
if attempt == self.MAX_RETRIES - 1:
raise SalesforceError(f"Failed to connect after {self.MAX_RETRIES} attempts") from e
await asyncio.sleep(2 ** attempt) # Exponential backoff
return False

async def fetch_data(self, query: str) -> Dict[str, Any]:
"""Execute SOQL query"""
try:
return self.client.query(query)
except Exception as e:
print(f"Salesforce query error: {str(e)}")
raise

async def _rate_limit(self):
"""Implement rate limiting for API calls"""
current_time = time.time()
if current_time - self._last_request_time < 1: # Within 1 second window
if self._request_count >= self.API_LIMIT_PER_SECOND:
await asyncio.sleep(1 - (current_time - self._last_request_time))
self._request_count = 0
self._last_request_time = time.time()
else:
self._request_count = 0
self._last_request_time = current_time
self._request_count += 1

async def create_record(self, sobject: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new Salesforce record"""
await self._rate_limit()
try:
logging.info(f"Creating new {sobject} record")
sf_object = getattr(self.client, sobject)
result = sf_object.create(data)
logging.info(f"Successfully created {sobject} record")
return result
except Exception as e:
logging.error(f"Salesforce create error: {str(e)}")
raise SalesforceError(f"Failed to create {sobject} record") from e

async def create_records_batch(self, sobject: str, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Create multiple records in batches"""
results = []
for i in range(0, len(records), self.BATCH_SIZE):
batch = records[i:i + self.BATCH_SIZE]
await self._rate_limit()
try:
sf_object = getattr(self.client, sobject)
batch_results = sf_object.bulk.create(batch)
results.extend(batch_results)
logging.info(f"Successfully created batch of {len(batch)} {sobject} records")
except Exception as e:
logging.error(f"Batch creation error: {str(e)}")
raise SalesforceError(f"Failed to create batch of {sobject} records") from e
return results

async def update_record(self, sobject: str, record_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing Salesforce record"""
try:
sf_object = getattr(self.client, sobject)
return sf_object.update(record_id, data)
except Exception as e:
print(f"Salesforce update error: {str(e)}")
raise

async def delete_record(self, sobject: str, record_id: str) -> bool:
"""Delete a Salesforce record"""
try:
sf_object = getattr(self.client, sobject)
return sf_object.delete(record_id)
except Exception as e:
print(f"Salesforce delete error: {str(e)}")
raise

async def describe_object(self, sobject: str) -> Dict[str, Any]:
"""Get metadata about a Salesforce object"""
try:
sf_object = getattr(self.client, sobject)
return sf_object.describe()
except Exception as e:
print(f"Salesforce describe error: {str(e)}")
raise

async def get_deleted_records(self, sobject: str, start: datetime, end: datetime) -> Dict[str, Any]:
"""Get records deleted between start and end dates"""
try:
sf_object = getattr(self.client, sobject)
return sf_object.deleted(start, end)
except Exception as e:
print(f"Salesforce get deleted records error: {str(e)}")
raise

async def get_updated_records(self, sobject: str, start: datetime, end: datetime) -> Dict[str, Any]:
"""Get records updated between start and end dates"""
try:
sf_object = getattr(self.client, sobject)
return sf_object.updated(start, end)
except Exception as e:
print(f"Salesforce get updated records error: {str(e)}")
raise

class DynamicsConnector(BaseAPIConnector):
async def connect(self) -> bool:
try:
# Implement Dynamics connection logic
return True
except Exception as e:
print(f"Dynamics connection error: {str(e)}")
return False

async def fetch_data(self, query: str) -> Dict[str, Any]:
# Implement Dynamics query execution
pass

class QlikConnector(BaseAPIConnector):
async def connect(self) -> bool:
try:
# Implement Qlik connection logic
return True
except Exception as e:
print(f"Qlik connection error: {str(e)}")
return False

async def fetch_data(self, query: str) -> Dict[str, Any]:
# Implement Qlik query execution
pass

Это мой test_salesforce.py

import pytest
import asyncio
import time
import os
from unittest.mock import Mock, patch, MagicMock
from api_connectors import SalesforceConnector, SalesforceError
from models import Department

@pytest.fixture
def test_credentials():
return {
"username": os.getenv('SF_USERNAME'),
"password": os.getenv('SF_PASSWORD'),
"security_token": os.getenv('SF_SECURITY_TOKEN'),
"consumer_key": os.getenv('SF_CONSUMER_KEY'),
"consumer_secret": os.getenv('SF_CONSUMER_SECRET')
}

@pytest.fixture
def mock_sf():
mock = MagicMock()
mock.query.return_value = {'records': []}
mock.Account = MagicMock()
mock.Account.create.return_value = {'success': True, 'id': '001xxx'}
mock.Account.bulk = MagicMock()
mock.Account.bulk.create.return_value = [{'success': True} for _ in range(5)]
return mock

@pytest.fixture
def connector(test_credentials):
with patch('simple_salesforce.Salesforce') as mock_sf_class:
mock_sf_instance = MagicMock()
mock_sf_class.return_value = mock_sf_instance
mock_sf_instance.query.return_value = {'records': []}
connector = SalesforceConnector(test_credentials, Department.MARKETING)
return connector

@pytest.mark.asyncio
async def test_connection(connector):
success = await connector.connect()
assert success == True

@pytest.mark.asyncio
async def test_fetch_data(connector):
await connector.connect()
result = await connector.fetch_data("SELECT Id, Name FROM Account LIMIT 5")
assert 'records' in result

@pytest.mark.asyncio
async def test_create_record(connector):
await connector.connect()
data = {"Name": "Test Account", "Description": "Test Description"}
result = await connector.create_record("Account", data)
assert result['success'] == True

@pytest.mark.asyncio
async def test_batch_create(connector):
await connector.connect()
records = [
{"Name": f"Test Account {i}", "Description": "Batch Test"}
for i in range(5)
]
results = await connector.create_records_batch("Account", records)
assert len(results) == 5

@pytest.mark.asyncio
async def test_rate_limiting(connector):
await connector.connect()
start_time = time.time()
requests = [connector._rate_limit() for _ in range(25)]
await asyncio.gather(*requests)
duration = time.time() - start_time
assert duration >= 1 # Should take at least 1 second due to rate limiting


Подробнее здесь: https://stackoverflow.com/questions/794 ... ce-sandbox
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Применение - разъем API к подключенному приложению в Salesforce (Sandbox)
    Anonymous » » в форуме Python
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Как получить доступ к маркетинговому облаку Salesforce через Simple-Salesforce
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Применение com.apple.security.app-sandbox на бинарные сбои Hello World с незаконным оборудованием
    Anonymous » » в форуме C++
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • Применение com.apple.security.app-sandbox на бинарные сбои Hello World с незаконным оборудованием
    Anonymous » » в форуме C++
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Остановить удаленный доступ к подключенному к rndis устройству
    Anonymous » » в форуме Linux
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»