Практические примеры работы с API на Python для типичных задач дата-инженеров и разработчиков.
pip install requests pandas python-dotenv TARGETADS_API_URL=https://api.targetads.io
TARGETADS_TOKEN=your_token_here
TARGETADS_PROJECT_ID=12486 import os
import requests
from dotenv import load_dotenv
load_dotenv()
class TargetAdsClient:
def __init__(self):
self.api_url = os.getenv("TARGETADS_API_URL")
self.token = os.getenv("TARGETADS_TOKEN")
self.project_id = int(os.getenv("TARGETADS_PROJECT_ID"))
self.headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def _request(self, method, endpoint, params=None, json=None):
"""Базовый метод для запросов"""
url = f"{self.api_url}{endpoint}"
response = requests.request(
method=method,
url=url,
params=params,
json=json,
headers=self.headers,
timeout=300
)
response.raise_for_status()
return response.json()
def get(self, endpoint, params=None):
return self._request("GET", endpoint, params=params)
def post(self, endpoint, params=None, json=None):
return self._request("POST", endpoint, params=params, json=json)
# Использование
client = TargetAdsClient() def check_api_access(client):
"""Проверка валидности токена и доступа"""
try:
token_info = client.get(
"/v1/meta/token_validate",
params={"project_id": client.project_id}
)
print(f"✓ Token valid until: {token_info['expires_at']}")
print(f"✓ Admin access: {token_info['is_admin']}")
campaigns = client.get(
"/v1/meta/campaigns",
params={"project_id": client.project_id, "active": True}
)
print(f"✓ Access to {campaigns['count']} campaigns")
return True
except requests.exceptions.HTTPError as e:
print(f"✗ API access error: {e}")
return False
# Использование
if check_api_access(client):
print("API ready to use!") def get_campaigns(client, active=True):
"""Получить список доступных кампаний"""
response = client.get(
"/v1/meta/campaigns",
params={
"project_id": client.project_id,
"active": active
}
)
return response['campaigns']
# Использование
campaigns = get_campaigns(client, active=True)
for campaign in campaigns:
print(f"{campaign['placement_id']}: {campaign['placement_name']}") def get_raw_data_simple(client, date_from, date_to):
"""Простой запрос сырых данных"""
payload = {
"ResponseType": "JSON",
"Fields": [
"InteractionTime",
"InteractionType",
"InteractionDeviceID",
"InteractionUrlPath"
],
"InteractionFilter": {
"DateFrom": date_from,
"DateTo": date_to,
"InteractionType": ["PageView", "Click"]
},
"Limit": 10000
}
response = client.post(
"/v1/reports/raw_reports",
params={"project_id": client.project_id},
json=payload
)
return response['data']
# Использование
data = get_raw_data_simple(client, "2024-01-01", "2024-01-31")
print(f"Retrieved {len(data)} records") def get_raw_data_paginated(client, date_from, date_to, page_size=100000):
"""Получение данных с пагинацией"""
all_data = []
offset = 0
payload_template = {
"ResponseType": "JSON",
"Fields": [
"InteractionTime",
"InteractionType",
"InteractionDeviceID",
"InteractionUrlPath"
],
"InteractionFilter": {
"DateFrom": date_from,
"DateTo": date_to,
"InteractionType": ["PageView"]
}
}
while True:
payload = {**payload_template, "Offset": offset, "Limit": page_size}
print(f"Fetching records {offset} to {offset + page_size}...")
response = client.post(
"/v1/reports/raw_reports",
params={"project_id": client.project_id},
json=payload
)
data = response['data']
all_data.extend(data)
if len(data) < page_size:
break
offset += page_size
print(f"Total records: {len(all_data)}")
return all_data def get_raw_data_by_campaigns(client, date_from, date_to, campaigns_per_batch=10):
"""Разделение запросов по кампаниям для избежания таймаутов"""
campaigns = get_campaigns(client, active=True)
print(f"Processing {len(campaigns)} campaigns...")
all_data = []
# Разбиваем на батчи
for i in range(0, len(campaigns), campaigns_per_batch):
batch = campaigns[i:i + campaigns_per_batch]
campaign_ids = [c['placement_id'] for c in batch]
print(f"Batch {i // campaigns_per_batch + 1}: campaigns {campaign_ids}")
payload = {
"ResponseType": "JSON",
"Fields": [
"InteractionTime",
"InteractionType",
"InteractionMediaCampaignName",
"InteractionDeviceID"
],
"InteractionFilter": {
"DateFrom": date_from,
"DateTo": date_to,
"InteractionType": ["Impression", "Click"],
"MediaCampaignId": campaign_ids
}
}
try:
response = client.post(
"/v1/reports/raw_reports",
params={"project_id": client.project_id},
json=payload
)
all_data.extend(response['data'])
print(f" ✓ Retrieved {len(response['data'])} records")
except Exception as e:
print(f" ✗ Error: {e}")
continue
return all_data
# Использование
data = get_raw_data_by_campaigns(client, "2024-01-01", "2024-01-31") from datetime import datetime, timedelta
def split_date_range(date_from, date_to, days_per_chunk=7):
"""Разделение периода на части"""
current = datetime.strptime(date_from, '%Y-%m-%d')
end = datetime.strptime(date_to, '%Y-%m-%d')
chunks = []
while current < end:
chunk_end = min(current + timedelta(days=days_per_chunk), end)
chunks.append((
current.strftime('%Y-%m-%d'),
chunk_end.strftime('%Y-%m-%d')
))
current = chunk_end
return chunks
def get_raw_data_by_weeks(client, date_from, date_to):
"""Получение данных с разбивкой по неделям"""
date_chunks = split_date_range(date_from, date_to, days_per_chunk=7)
print(f"Split into {len(date_chunks)} chunks")
all_data = []
for chunk_from, chunk_to in date_chunks:
print(f"Processing {chunk_from} to {chunk_to}...")
payload = {
"ResponseType": "JSON",
"Fields": ["InteractionTime", "InteractionType", "InteractionDeviceID"],
"InteractionFilter": {
"DateFrom": chunk_from,
"DateTo": chunk_to,
"InteractionType": ["PageView"]
}
}
response = client.post(
"/v1/reports/raw_reports",
params={"project_id": client.project_id},
json=payload
)
all_data.extend(response['data'])
print(f" Retrieved {len(response['data'])} records")
return all_data def get_campaign_performance(client, date_from, date_to):
"""Отчет по эффективности кампаний"""
payload = {
"ResponseType": "JSON",
"Fields": ["EventDate", "MediaCampaign"],
"MediaMetrics": ["Impressions", "Clicks", "CTR", "Reach", "Frequency"],
"TargetMetrics": ["TargetEventCount", "TargetEcomAmount"],
"InteractionFilter": {
"DateFrom": date_from,
"DateTo": date_to
},
"AttributionModel": "mli",
"AttributionWindow": "30",
"DateGrouping": "day"
}
response = client.post(
"/v1/reports/agg_report",
params={"project_id": client.project_id},
json=payload
)
return response['data']
# Использование с pandas
import pandas as pd
data = get_campaign_performance(client, "2024-01-01", "2024-01-31")
df = pd.DataFrame(data)
# Конвертация типов
df['Impressions'] = df['Impressions'].astype(int)
df['Clicks'] = df['Clicks'].astype(int)
df['CTR'] = df['CTR'].astype(float)
df['TargetEventCount'] = df['TargetEventCount'].astype(float)
df['TargetEcomAmount'] = df['TargetEcomAmount'].astype(float)
# Расчет производных метрик
df['CPA'] = df.apply(
lambda row: row['TargetEcomAmount'] / row['TargetEventCount']
if row['TargetEventCount'] > 0 else 0,
axis=1
)
print(df.head()) def compare_attribution_models(client, date_from, date_to):
"""Сравнение разных моделей атрибуции"""
models = {
'mli': 'Media Last Interaction',
'mfi': 'Media First Interaction',
'ml': 'Media Linear',
'fl': 'Full Linear'
}
results = {}
for model_code, model_name in models.items():
print(f"Fetching {model_name}...")
payload = {
"ResponseType": "JSON",
"Fields": ["MediaCampaign"],
"MediaMetrics": ["Impressions", "Clicks"],
"TargetMetrics": ["TargetEventCount", "TargetEcomAmount"],
"InteractionFilter": {
"DateFrom": date_from,
"DateTo": date_to
},
"AttributionModel": model_code,
"AttributionWindow": "30",
"DateGrouping": "month"
}
response = client.post(
"/v1/reports/agg_report",
params={"project_id": client.project_id},
json=payload
)
df = pd.DataFrame(response['data'])
df['Model'] = model_name
results[model_code] = df
# Объединение результатов
combined = pd.concat(results.values(), ignore_index=True)
return combined
# Использование
comparison = compare_attribution_models(client, "2024-01-01", "2024-01-31")
print(comparison.pivot_table(
index='MediaCampaign',
columns='Model',
values='TargetEventCount',
aggfunc='sum'
)) def get_ecommerce_report(client, date_from, date_to):
"""Детальный e-commerce отчет"""
payload = {
"ResponseType": "JSON",
"Fields": [
"EventDate",
"MediaCampaign",
"TargetEcomItemsCategory1"
],
"MediaMetrics": ["Impressions", "Clicks"],
"TargetMetrics": [
"TargetEventCount",
"TargetEcomAmount",
"TargetEcomQuantity"
],
"InteractionFilter": {
"DateFrom": date_from,
"DateTo": date_to
},
"TargetFilter": {
"EventType": ["Purchase"]
},
"AttributionModel": "mli",
"AttributionWindow": "30",
"DateGrouping": "day"
}
response = client.post(
"/v1/reports/agg_report",
params={"project_id": client.project_id},
json=payload
)
df = pd.DataFrame(response['data'])
# Расчет метрик
df['TargetEcomAmount'] = df['TargetEcomAmount'].astype(float)
df['TargetEventCount'] = df['TargetEventCount'].astype(float)
df['Clicks'] = df['Clicks'].astype(int)
df['AOV'] = df['TargetEcomAmount'] / df['TargetEventCount'] # Average Order Value
df['CR'] = (df['TargetEventCount'] / df['Clicks'] * 100) # Conversion Rate
return df def get_conversion_paths(client, date_from, date_to, event_type="Purchase"):
"""Получение путей к конверсии"""
payload = {
"ResponseType": "JSON",
"Fields": [
"PathDeviceID",
"PathPosition",
"PathLength",
"InteractionTime",
"InteractionType",
"InteractionMediaCampaignName",
"TargetEventName",
"TargetEcomAmount",
"Weight_MLI",
"Weight_FL"
],
"TargetFilter": {
"DateFrom": date_from,
"DateTo": date_to,
"EventType": [event_type]
}
}
response = client.post(
"/v1/reports/path_to_conversion",
params={"project_id": client.project_id},
json=payload
)
return pd.DataFrame(response['data'])
# Использование
paths_df = get_conversion_paths(client, "2024-01-01", "2024-01-31")
print(f"Total touchpoints: {len(paths_df)}")
print(f"Unique conversions: {paths_df['PathDeviceID'].nunique()}") def analyze_path_length(paths_df):
"""Анализ распределения длины путей"""
# Берем только первое касание каждого пути
first_touches = paths_df[paths_df['PathPosition'] == '1'].copy()
first_touches['PathLength'] = first_touches['PathLength'].astype(int)
# Группировка по длине пути
length_distribution = first_touches.groupby('PathLength').agg({
'PathDeviceID': 'count',
'TargetEcomAmount': lambda x: x.astype(float).sum()
}).rename(columns={
'PathDeviceID': 'Conversions',
'TargetEcomAmount': 'Revenue'
})
length_distribution['AvgRevenue'] = (
length_distribution['Revenue'] / length_distribution['Conversions']
)
return length_distribution
# Использование
length_stats = analyze_path_length(paths_df)
print(length_stats) def calculate_campaign_attribution(paths_df, weight_column='Weight_MLI'):
"""Расчет вклада кампаний"""
df = paths_df.copy()
# Конвертация типов
df['Weight'] = df[weight_column].astype(float)
df['Revenue'] = df['TargetEcomAmount'].astype(float)
# Фильтруем только медийные касания
media_touches = df[df['InteractionMediaCampaignName'] != ''].copy()
# Группировка по кампаниям
campaign_contribution = media_touches.groupby('InteractionMediaCampaignName').agg({
'Weight': 'sum', # Атрибутированные конверсии
'Revenue': lambda x: (x * media_touches.loc[x.index, 'Weight']).sum() # Атрибутированный доход
}).rename(columns={
'Weight': 'Conversions',
'Revenue': 'AttributedRevenue'
})
campaign_contribution['CPA'] = (
campaign_contribution['AttributedRevenue'] /
campaign_contribution['Conversions']
)
return campaign_contribution.sort_values('Conversions', ascending=False)
# Сравнение моделей
mli_attribution = calculate_campaign_attribution(paths_df, 'Weight_MLI')
fl_attribution = calculate_campaign_attribution(paths_df, 'Weight_FL')
print("Media Last Interaction:")
print(mli_attribution.head())
print("\nFull Linear:")
print(fl_attribution.head()) def analyze_journey_patterns(paths_df, top_n=10):
"""Анализ популярных паттернов путей"""
# Группировка по пути
paths = paths_df.groupby('PathDeviceID').apply(
lambda x: ' -> '.join(
x.sort_values('PathPosition')['InteractionType'].astype(str)
)
).reset_index()
paths.columns = ['PathDeviceID', 'Pattern']
# Подсчет популярности
pattern_counts = paths['Pattern'].value_counts().head(top_n)
return pd.DataFrame({
'Pattern': pattern_counts.index,
'Count': pattern_counts.values,
'Percentage': (pattern_counts.values / len(paths) * 100).round(2)
})
# Использование
popular_patterns = analyze_journey_patterns(paths_df, top_n=20)
print(popular_patterns) def analyze_journey_patterns(paths_df, top_n=10):
"""Анализ популярных паттернов путей"""
# Группировка по пути
paths = paths_df.groupby('PathDeviceID').apply(
lambda x: ' -> '.join(
x.sort_values('PathPosition')['InteractionType'].astype(str)
)
).reset_index()
paths.columns = ['PathDeviceID', 'Pattern']
# Подсчет популярности
pattern_counts = paths['Pattern'].value_counts().head(top_n)
return pd.DataFrame({
'Pattern': pattern_counts.index,
'Count': pattern_counts.values,
'Percentage': (pattern_counts.values / len(paths) * 100).round(2)
})
# Использование
popular_patterns = analyze_journey_patterns(paths_df, top_n=20)
print(popular_patterns) def analyze_conversion_time(paths_df):
"""Анализ времени до конверсии"""
df = paths_df.copy()
df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])
df['PathPosition'] = df['PathPosition'].astype(int)
# Для каждого пути найти время первого и последнего касания
path_times = df.groupby('PathDeviceID').agg({
'InteractionTime': ['min', 'max'],
'PathLength': 'first',
'TargetEcomAmount': 'first'
})
path_times.columns = ['FirstTouch', 'Conversion', 'PathLength', 'Revenue']
# Время до конверсии
path_times['TimeToConversion'] = (
path_times['Conversion'] - path_times['FirstTouch']
)
path_times['HoursToConversion'] = (
path_times['TimeToConversion'].dt.total_seconds() / 3600
)
# Статистика
stats = {
'AvgHours': path_times['HoursToConversion'].mean(),
'MedianHours': path_times['HoursToConversion'].median(),
'MaxHours': path_times['HoursToConversion'].max(),
'AvgPathLength': path_times['PathLength'].mean()
}
return path_times, stats
# Использование
path_times, stats = analyze_conversion_time(paths_df)
print("Conversion Time Statistics:")
for key, value in stats.items():
print(f" {key}: {value:.2f}") def export_to_csv(client, date_from, date_to, filename):
"""Экспорт данных в CSV"""
data = get_raw_data_simple(client, date_from, date_to)
df = pd.DataFrame(data)
df.to_csv(filename, index=False, encoding='utf-8')
print(f"Data exported to {filename}")
# Использование
export_to_csv(client, "2024-01-01", "2024-01-31", "raw_data.csv") def export_to_parquet(client, date_from, date_to, filename):
"""Экспорт данных в Parquet (сжатый формат)"""
data = get_raw_data_by_campaigns(client, date_from, date_to)
df = pd.DataFrame(data)
# Конвертация типов для оптимизации
df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])
df.to_parquet(filename, compression='snappy', index=False)
print(f"Data exported to {filename}")
# Использование
export_to_parquet(client, "2024-01-01", "2024-01-31", "raw_data.parquet") def export_to_excel(client, date_from, date_to, filename):
"""Экспорт различных отчетов в Excel"""
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
# Лист 1: Campaign Performance
campaign_data = get_campaign_performance(client, date_from, date_to)
pd.DataFrame(campaign_data).to_excel(
writer, sheet_name='Campaign Performance', index=False
)
# Лист 2: E-commerce
ecom_data = get_ecommerce_report(client, date_from, date_to)
ecom_data.to_excel(writer, sheet_name='E-commerce', index=False)
# Лист 3: Campaigns List
campaigns = get_campaigns(client, active=True)
pd.DataFrame(campaigns).to_excel(
writer, sheet_name='Campaigns', index=False
)
print(f"Report exported to {filename}")
# Использование
export_to_excel(client, "2024-01-01", "2024-01-31", "report.xlsx") from sqlalchemy import create_engine
def load_to_postgres(client, date_from, date_to, connection_string, table_name):
"""Загрузка данных в PostgreSQL"""
# Получение данных
data = get_raw_data_by_campaigns(client, date_from, date_to)
df = pd.DataFrame(data)
# Конвертация типов
df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])
# Создание подключения
engine = create_engine(connection_string)
# Загрузка данных
df.to_sql(
table_name,
engine,
if_exists='append', # 'replace' для перезаписи
index=False,
chunksize=10000
)
print(f"Loaded {len(df)} records to {table_name}")
# Использование
load_to_postgres(
client,
"2024-01-01",
"2024-01-31",
"postgresql://user:password@localhost:5432/analytics",
"targetads_raw_data"
) from google.cloud import bigquery
def load_to_bigquery(client, date_from, date_to, project_id, dataset_id, table_id):
"""Загрузка данных в BigQuery"""
# Получение данных
data = get_raw_data_by_campaigns(client, date_from, date_to)
df = pd.DataFrame(data)
# Конвертация типов
df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])
# BigQuery клиент
bq_client = bigquery.Client(project=project_id)
table_ref = f"{project_id}.{dataset_id}.{table_id}"
# Настройка загрузки
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
]
)
# Загрузка
job = bq_client.load_table_from_dataframe(
df, table_ref, job_config=job_config
)
job.result()
print(f"Loaded {len(df)} records to {table_ref}")
# Использование
load_to_bigquery(
client,
"2024-01-01",
"2024-01-31",
"my-project",
"analytics",
"targetads_raw_data"
) from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
def extract_targetads_data(**context):
"""Извлечение данных из TargetAds"""
execution_date = context['execution_date']
date_str = execution_date.strftime('%Y-%m-%d')
client = TargetAdsClient()
data = get_raw_data_by_campaigns(client, date_str, date_str)
# Сохранение в промежуточное хранилище
df = pd.DataFrame(data)
filename = f"/tmp/targetads_{date_str}.parquet"
df.to_parquet(filename)
return filename
def load_to_warehouse(**context):
"""Загрузка данных в хранилище"""
ti = context['ti']
filename = ti.xcom_pull(task_ids='extract_data')
# Загрузка из файла
df = pd.read_parquet(filename)
# Загрузка в warehouse
engine = create_engine(os.getenv('WAREHOUSE_CONNECTION'))
df.to_sql('targetads_raw_data', engine, if_exists='append', index=False)
print(f"Loaded {len(df)} records")
# DAG
dag = DAG(
'targetads_daily_extract',
default_args=default_args,
description='Daily extract from TargetAds API',
schedule_interval='0 2 * * *', # Каждый день в 2:00
catchup=False
)
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_targetads_data,
dag=dag
)
load_task = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse,
dag=dag
)
extract_task >> load_task import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
"""Декоратор для retry с экспоненциальной задержкой"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limit - ждем 60 секунд
print(f"Rate limit hit, waiting 60 seconds...")
time.sleep(60)
continue
elif e.response.status_code >= 500:
# Server error - retry с backoff
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Server error, retrying in {delay}s... (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
continue
else:
# Client error - не retry
raise
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Request failed, retrying in {delay}s... (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
raise Exception(f"Max retries ({max_retries}) exceeded")
return wrapper
return decorator
# Использование
class RobustTargetAdsClient(TargetAdsClient):
@retry_with_backoff(max_retries=3)
def post(self, endpoint, params=None, json=None):
return super().post(endpoint, params, json)
@retry_with_backoff(max_retries=3)
def get(self, endpoint, params=None):
return super().get(endpoint, params) class TargetAdsAPIError(Exception):
"""Базовое исключение для API ошибок"""
pass
class AuthenticationError(TargetAdsAPIError):
"""Ошибка аутентификации"""
pass
class ValidationError(TargetAdsAPIError):
"""Ошибка валидации параметров"""
pass
class RateLimitError(TargetAdsAPIError):
"""Превышен лимит запросов"""
pass
def handle_api_error(response):
"""Обработка ошибок API с детальной информацией"""
try:
error_data = response.json()
except:
error_data = {}
if response.status_code == 401:
raise AuthenticationError(
f"Authentication failed: {error_data.get('ErrorMessage', 'Invalid token')}"
)
elif response.status_code == 400:
errors = error_data.get('Errors', [])
error_messages = [
f"{e['FailedField']}: {e['Tag']}"
for e in errors if isinstance(e, dict)
]
raise ValidationError(
f"Validation error: {', '.join(error_messages) or error_data.get('ErrorMessage')}"
)
elif response.status_code == 429:
raise RateLimitError("Rate limit exceeded. Please wait before making more requests.")
elif response.status_code >= 500:
raise TargetAdsAPIError(f"Server error: {response.status_code}")
response.raise_for_status()
# Использование в клиенте
class SafeTargetAdsClient(TargetAdsClient):
def _request(self, method, endpoint, params=None, json=None):
url = f"{self.api_url}{endpoint}"
response = requests.request(
method=method,
url=url,
params=params,
json=json,
headers=self.headers,
timeout=300
)
if not response.ok:
handle_api_error(response)
return response.json() import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('targetads_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('TargetAdsAPI')
class MonitoredTargetAdsClient(TargetAdsClient):
def __init__(self):
super().__init__()
self.request_count = 0
self.error_count = 0
def _request(self, method, endpoint, params=None, json=None):
self.request_count += 1
start_time = datetime.now()
try:
logger.info(f"Request #{self.request_count}: {method} {endpoint}")
response = super()._request(method, endpoint, params, json)
duration = (datetime.now() - start_time).total_seconds()
logger.info(f"Request completed in {duration:.2f}s")
return response
except Exception as e:
self.error_count += 1
duration = (datetime.now() - start_time).total_seconds()
logger.error(
f"Request failed after {duration:.2f}s: {str(e)}",
exc_info=True
)
raise
def get_stats(self):
"""Получить статистику использования API"""
return {
'total_requests': self.request_count,
'errors': self.error_count,
'success_rate': (
(self.request_count - self.error_count) / self.request_count * 100
if self.request_count > 0 else 0
)
} import sys
def daily_etl_pipeline(date):
"""Полный ETL пайплайн для одного дня"""
logger.info(f"Starting ETL pipeline for {date}")
# 1. Инициализация клиента
client = MonitoredTargetAdsClient()
try:
# 2. Валидация доступа
logger.info("Validating API access...")
if not check_api_access(client):
raise Exception("API access validation failed")
# 3. Извлечение данных
logger.info("Extracting raw data...")
raw_data = get_raw_data_by_campaigns(client, date, date)
logger.info(f"Extracted {len(raw_data)} raw records")
# 4. Извлечение агрегированных данных
logger.info("Extracting aggregated data...")
agg_data = get_campaign_performance(client, date, date)
logger.info(f"Extracted {len(agg_data)} aggregated records")
# 5. Трансформация
logger.info("Transforming data...")
raw_df = pd.DataFrame(raw_data)
agg_df = pd.DataFrame(agg_data)
# Конвертация типов
raw_df['InteractionTime'] = pd.to_datetime(raw_df['InteractionTime'])
for col in ['Impressions', 'Clicks']:
if col in agg_df.columns:
agg_df[col] = agg_df[col].astype(int)
# 6. Загрузка в хранилище
logger.info("Loading to warehouse...")
engine = create_engine(os.getenv('WAREHOUSE_CONNECTION'))
raw_df.to_sql('targetads_raw_data', engine, if_exists='append', index=False)
agg_df.to_sql('targetads_agg_data', engine, if_exists='append', index=False)
# 7. Статистика
stats = client.get_stats()
logger.info(f"Pipeline completed. Stats: {stats}")
return True
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}", exc_info=True)
return False
# Запуск
if __name__ == "__main__":
date = sys.argv[1] if len(sys.argv) > 1 else datetime.now().strftime('%Y-%m-%d')
success = daily_etl_pipeline(date)
sys.exit(0 if success else 1)