Примеры использования TargetAds External API

Практические примеры работы с API на Python для типичных задач дата-инженеров и разработчиков.

Установка и настройка
Требования
pip install requests pandas python-dotenv
Настройка окружения
Создайте файл `.env`:
TARGETADS_API_URL=https://api.targetads.io
TARGETADS_TOKEN=your_token_here
TARGETADS_PROJECT_ID=12486
Базовый клиент
import os
import requests
from dotenv import load_dotenv

load_dotenv()

class TargetAdsClient:
    def __init__(self):
        self.api_url = os.getenv("TARGETADS_API_URL")
        self.token = os.getenv("TARGETADS_TOKEN")
        self.project_id = int(os.getenv("TARGETADS_PROJECT_ID"))
        self.headers = {
            "Authorization": f"Bearer {self.token}",
            "Content-Type": "application/json"
        }

    def _request(self, method, endpoint, params=None, json=None):
        """Базовый метод для запросов"""
        url = f"{self.api_url}{endpoint}"
        response = requests.request(
            method=method,
            url=url,
            params=params,
            json=json,
            headers=self.headers,
            timeout=300
        )
        response.raise_for_status()
        return response.json()

    def get(self, endpoint, params=None):
        return self._request("GET", endpoint, params=params)

    def post(self, endpoint, params=None, json=None):
        return self._request("POST", endpoint, params=params, json=json)

# Использование
client = TargetAdsClient()
Базовые операции
Проверка доступа к API
def check_api_access(client):
    """Проверка валидности токена и доступа"""
    try:
        token_info = client.get(
            "/v1/meta/token_validate",
            params={"project_id": client.project_id}
        )

        print(f"✓ Token valid until: {token_info['expires_at']}")
        print(f"✓ Admin access: {token_info['is_admin']}")

        campaigns = client.get(
            "/v1/meta/campaigns",
            params={"project_id": client.project_id, "active": True}
        )
        print(f"✓ Access to {campaigns['count']} campaigns")

        return True

    except requests.exceptions.HTTPError as e:
        print(f"✗ API access error: {e}")
        return False

# Использование
if check_api_access(client):
    print("API ready to use!")
Получение списка кампаний
def get_campaigns(client, active=True):
    """Получить список доступных кампаний"""
    response = client.get(
        "/v1/meta/campaigns",
        params={
            "project_id": client.project_id,
            "active": active
        }
    )
    return response['campaigns']

# Использование
campaigns = get_campaigns(client, active=True)
for campaign in campaigns:
    print(f"{campaign['placement_id']}: {campaign['placement_name']}")
Простой запрос Raw Data
def get_raw_data_simple(client, date_from, date_to):
    """Простой запрос сырых данных"""
    payload = {
        "ResponseType": "JSON",
        "Fields": [
            "InteractionTime",
            "InteractionType",
            "InteractionDeviceID",
            "InteractionUrlPath"
        ],
        "InteractionFilter": {
            "DateFrom": date_from,
            "DateTo": date_to,
            "InteractionType": ["PageView", "Click"]
        },
        "Limit": 10000
    }

    response = client.post(
        "/v1/reports/raw_reports",
        params={"project_id": client.project_id},
        json=payload
    )

    return response['data']

# Использование
data = get_raw_data_simple(client, "2024-01-01", "2024-01-31")
print(f"Retrieved {len(data)} records")
Работа с большими данными
Пагинация
def get_raw_data_paginated(client, date_from, date_to, page_size=100000):
    """Получение данных с пагинацией"""
    all_data = []
    offset = 0

    payload_template = {
        "ResponseType": "JSON",
        "Fields": [
            "InteractionTime",
            "InteractionType",
            "InteractionDeviceID",
            "InteractionUrlPath"
        ],
        "InteractionFilter": {
            "DateFrom": date_from,
            "DateTo": date_to,
            "InteractionType": ["PageView"]
        }
    }

    while True:
        payload = {**payload_template, "Offset": offset, "Limit": page_size}

        print(f"Fetching records {offset} to {offset + page_size}...")

        response = client.post(
            "/v1/reports/raw_reports",
            params={"project_id": client.project_id},
            json=payload
        )

        data = response['data']
        all_data.extend(data)

        if len(data) < page_size:
            break

        offset += page_size

    print(f"Total records: {len(all_data)}")
    return all_data
Разделение по кампаниям
def get_raw_data_by_campaigns(client, date_from, date_to, campaigns_per_batch=10):
    """Разделение запросов по кампаниям для избежания таймаутов"""
    campaigns = get_campaigns(client, active=True)
    print(f"Processing {len(campaigns)} campaigns...")

    all_data = []

    # Разбиваем на батчи
    for i in range(0, len(campaigns), campaigns_per_batch):
        batch = campaigns[i:i + campaigns_per_batch]
        campaign_ids = [c['placement_id'] for c in batch]

        print(f"Batch {i // campaigns_per_batch + 1}: campaigns {campaign_ids}")

        payload = {
            "ResponseType": "JSON",
            "Fields": [
                "InteractionTime",
                "InteractionType",
                "InteractionMediaCampaignName",
                "InteractionDeviceID"
            ],
            "InteractionFilter": {
                "DateFrom": date_from,
                "DateTo": date_to,
                "InteractionType": ["Impression", "Click"],
                "MediaCampaignId": campaign_ids
            }
        }

        try:
            response = client.post(
                "/v1/reports/raw_reports",
                params={"project_id": client.project_id},
                json=payload
            )
            all_data.extend(response['data'])
            print(f"  ✓ Retrieved {len(response['data'])} records")

        except Exception as e:
            print(f"  ✗ Error: {e}")
            continue

    return all_data

# Использование
data = get_raw_data_by_campaigns(client, "2024-01-01", "2024-01-31")
Разделение по временным периодам
from datetime import datetime, timedelta

def split_date_range(date_from, date_to, days_per_chunk=7):
    """Разделение периода на части"""
    current = datetime.strptime(date_from, '%Y-%m-%d')
    end = datetime.strptime(date_to, '%Y-%m-%d')

    chunks = []
    while current < end:
        chunk_end = min(current + timedelta(days=days_per_chunk), end)
        chunks.append((
            current.strftime('%Y-%m-%d'),
            chunk_end.strftime('%Y-%m-%d')
        ))
        current = chunk_end

    return chunks

def get_raw_data_by_weeks(client, date_from, date_to):
    """Получение данных с разбивкой по неделям"""
    date_chunks = split_date_range(date_from, date_to, days_per_chunk=7)
    print(f"Split into {len(date_chunks)} chunks")

    all_data = []

    for chunk_from, chunk_to in date_chunks:
        print(f"Processing {chunk_from} to {chunk_to}...")

        payload = {
            "ResponseType": "JSON",
            "Fields": ["InteractionTime", "InteractionType", "InteractionDeviceID"],
            "InteractionFilter": {
                "DateFrom": chunk_from,
                "DateTo": chunk_to,
                "InteractionType": ["PageView"]
            }
        }

        response = client.post(
            "/v1/reports/raw_reports",
            params={"project_id": client.project_id},
            json=payload
        )

        all_data.extend(response['data'])
        print(f"  Retrieved {len(response['data'])} records")

    return all_data
Агрегированные отчеты
Базовый отчет по кампаниям
def get_campaign_performance(client, date_from, date_to):
    """Отчет по эффективности кампаний"""
    payload = {
        "ResponseType": "JSON",
        "Fields": ["EventDate", "MediaCampaign"],
        "MediaMetrics": ["Impressions", "Clicks", "CTR", "Reach", "Frequency"],
        "TargetMetrics": ["TargetEventCount", "TargetEcomAmount"],
        "InteractionFilter": {
            "DateFrom": date_from,
            "DateTo": date_to
        },
        "AttributionModel": "mli",
        "AttributionWindow": "30",
        "DateGrouping": "day"
    }

    response = client.post(
        "/v1/reports/agg_report",
        params={"project_id": client.project_id},
        json=payload
    )

    return response['data']

# Использование с pandas
import pandas as pd

data = get_campaign_performance(client, "2024-01-01", "2024-01-31")
df = pd.DataFrame(data)

# Конвертация типов
df['Impressions'] = df['Impressions'].astype(int)
df['Clicks'] = df['Clicks'].astype(int)
df['CTR'] = df['CTR'].astype(float)
df['TargetEventCount'] = df['TargetEventCount'].astype(float)
df['TargetEcomAmount'] = df['TargetEcomAmount'].astype(float)

# Расчет производных метрик
df['CPA'] = df.apply(
    lambda row: row['TargetEcomAmount'] / row['TargetEventCount']
    if row['TargetEventCount'] > 0 else 0,
    axis=1
)

print(df.head())
Сравнение моделей атрибуции
def compare_attribution_models(client, date_from, date_to):
    """Сравнение разных моделей атрибуции"""
    models = {
        'mli': 'Media Last Interaction',
        'mfi': 'Media First Interaction',
        'ml': 'Media Linear',
        'fl': 'Full Linear'
    }

    results = {}

    for model_code, model_name in models.items():
        print(f"Fetching {model_name}...")

        payload = {
            "ResponseType": "JSON",
            "Fields": ["MediaCampaign"],
            "MediaMetrics": ["Impressions", "Clicks"],
            "TargetMetrics": ["TargetEventCount", "TargetEcomAmount"],
            "InteractionFilter": {
                "DateFrom": date_from,
                "DateTo": date_to
            },
            "AttributionModel": model_code,
            "AttributionWindow": "30",
            "DateGrouping": "month"
        }

        response = client.post(
            "/v1/reports/agg_report",
            params={"project_id": client.project_id},
            json=payload
        )

        df = pd.DataFrame(response['data'])
        df['Model'] = model_name
        results[model_code] = df

    # Объединение результатов
    combined = pd.concat(results.values(), ignore_index=True)
    return combined

# Использование
comparison = compare_attribution_models(client, "2024-01-01", "2024-01-31")
print(comparison.pivot_table(
    index='MediaCampaign',
    columns='Model',
    values='TargetEventCount',
    aggfunc='sum'
))
E-commerce отчет
def get_ecommerce_report(client, date_from, date_to):
    """Детальный e-commerce отчет"""
    payload = {
        "ResponseType": "JSON",
        "Fields": [
            "EventDate",
            "MediaCampaign",
            "TargetEcomItemsCategory1"
        ],
        "MediaMetrics": ["Impressions", "Clicks"],
        "TargetMetrics": [
            "TargetEventCount",
            "TargetEcomAmount",
            "TargetEcomQuantity"
        ],
        "InteractionFilter": {
            "DateFrom": date_from,
            "DateTo": date_to
        },
        "TargetFilter": {
            "EventType": ["Purchase"]
        },
        "AttributionModel": "mli",
        "AttributionWindow": "30",
        "DateGrouping": "day"
    }

    response = client.post(
        "/v1/reports/agg_report",
        params={"project_id": client.project_id},
        json=payload
    )

    df = pd.DataFrame(response['data'])

    # Расчет метрик
    df['TargetEcomAmount'] = df['TargetEcomAmount'].astype(float)
    df['TargetEventCount'] = df['TargetEventCount'].astype(float)
    df['Clicks'] = df['Clicks'].astype(int)

    df['AOV'] = df['TargetEcomAmount'] / df['TargetEventCount']  # Average Order Value
    df['CR'] = (df['TargetEventCount'] / df['Clicks'] * 100)  # Conversion Rate

    return df
Анализ пути к конверсии
Базовый анализ пути
def get_conversion_paths(client, date_from, date_to, event_type="Purchase"):
    """Получение путей к конверсии"""
    payload = {
        "ResponseType": "JSON",
        "Fields": [
            "PathDeviceID",
            "PathPosition",
            "PathLength",
            "InteractionTime",
            "InteractionType",
            "InteractionMediaCampaignName",
            "TargetEventName",
            "TargetEcomAmount",
            "Weight_MLI",
            "Weight_FL"
        ],
        "TargetFilter": {
            "DateFrom": date_from,
            "DateTo": date_to,
            "EventType": [event_type]
        }
    }

    response = client.post(
        "/v1/reports/path_to_conversion",
        params={"project_id": client.project_id},
        json=payload
    )

    return pd.DataFrame(response['data'])

# Использование
paths_df = get_conversion_paths(client, "2024-01-01", "2024-01-31")
print(f"Total touchpoints: {len(paths_df)}")
print(f"Unique conversions: {paths_df['PathDeviceID'].nunique()}")
Анализ длины пути
def analyze_path_length(paths_df):
    """Анализ распределения длины путей"""
    # Берем только первое касание каждого пути
    first_touches = paths_df[paths_df['PathPosition'] == '1'].copy()
    first_touches['PathLength'] = first_touches['PathLength'].astype(int)

    # Группировка по длине пути
    length_distribution = first_touches.groupby('PathLength').agg({
        'PathDeviceID': 'count',
        'TargetEcomAmount': lambda x: x.astype(float).sum()
    }).rename(columns={
        'PathDeviceID': 'Conversions',
        'TargetEcomAmount': 'Revenue'
    })

    length_distribution['AvgRevenue'] = (
        length_distribution['Revenue'] / length_distribution['Conversions']
    )

    return length_distribution

# Использование
length_stats = analyze_path_length(paths_df)
print(length_stats)
Вклад кампаний по моделям атрибуции
def calculate_campaign_attribution(paths_df, weight_column='Weight_MLI'):
    """Расчет вклада кампаний"""
    df = paths_df.copy()

    # Конвертация типов
    df['Weight'] = df[weight_column].astype(float)
    df['Revenue'] = df['TargetEcomAmount'].astype(float)

    # Фильтруем только медийные касания
    media_touches = df[df['InteractionMediaCampaignName'] != ''].copy()

    # Группировка по кампаниям
    campaign_contribution = media_touches.groupby('InteractionMediaCampaignName').agg({
        'Weight': 'sum',  # Атрибутированные конверсии
        'Revenue': lambda x: (x * media_touches.loc[x.index, 'Weight']).sum()  # Атрибутированный доход
    }).rename(columns={
        'Weight': 'Conversions',
        'Revenue': 'AttributedRevenue'
    })

    campaign_contribution['CPA'] = (
        campaign_contribution['AttributedRevenue'] /
        campaign_contribution['Conversions']
    )

    return campaign_contribution.sort_values('Conversions', ascending=False)

# Сравнение моделей
mli_attribution = calculate_campaign_attribution(paths_df, 'Weight_MLI')
fl_attribution = calculate_campaign_attribution(paths_df, 'Weight_FL')

print("Media Last Interaction:")
print(mli_attribution.head())

print("\nFull Linear:")
print(fl_attribution.head())
Анализ customer journey
def analyze_journey_patterns(paths_df, top_n=10):
    """Анализ популярных паттернов путей"""
    # Группировка по пути
    paths = paths_df.groupby('PathDeviceID').apply(
        lambda x: ' -> '.join(
            x.sort_values('PathPosition')['InteractionType'].astype(str)
        )
    ).reset_index()
    paths.columns = ['PathDeviceID', 'Pattern']

    # Подсчет популярности
    pattern_counts = paths['Pattern'].value_counts().head(top_n)

    return pd.DataFrame({
        'Pattern': pattern_counts.index,
        'Count': pattern_counts.values,
        'Percentage': (pattern_counts.values / len(paths) * 100).round(2)
    })

# Использование
popular_patterns = analyze_journey_patterns(paths_df, top_n=20)
print(popular_patterns)
Анализ customer journey
def analyze_journey_patterns(paths_df, top_n=10):
    """Анализ популярных паттернов путей"""
    # Группировка по пути
    paths = paths_df.groupby('PathDeviceID').apply(
        lambda x: ' -> '.join(
            x.sort_values('PathPosition')['InteractionType'].astype(str)
        )
    ).reset_index()
    paths.columns = ['PathDeviceID', 'Pattern']

    # Подсчет популярности
    pattern_counts = paths['Pattern'].value_counts().head(top_n)

    return pd.DataFrame({
        'Pattern': pattern_counts.index,
        'Count': pattern_counts.values,
        'Percentage': (pattern_counts.values / len(paths) * 100).round(2)
    })

# Использование
popular_patterns = analyze_journey_patterns(paths_df, top_n=20)
print(popular_patterns)
Time-to-conversion анализ
def analyze_conversion_time(paths_df):
    """Анализ времени до конверсии"""
    df = paths_df.copy()
    df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])
    df['PathPosition'] = df['PathPosition'].astype(int)

    # Для каждого пути найти время первого и последнего касания
    path_times = df.groupby('PathDeviceID').agg({
        'InteractionTime': ['min', 'max'],
        'PathLength': 'first',
        'TargetEcomAmount': 'first'
    })

    path_times.columns = ['FirstTouch', 'Conversion', 'PathLength', 'Revenue']

    # Время до конверсии
    path_times['TimeToConversion'] = (
        path_times['Conversion'] - path_times['FirstTouch']
    )
    path_times['HoursToConversion'] = (
        path_times['TimeToConversion'].dt.total_seconds() / 3600
    )

    # Статистика
    stats = {
        'AvgHours': path_times['HoursToConversion'].mean(),
        'MedianHours': path_times['HoursToConversion'].median(),
        'MaxHours': path_times['HoursToConversion'].max(),
        'AvgPathLength': path_times['PathLength'].mean()
    }

    return path_times, stats

# Использование
path_times, stats = analyze_conversion_time(paths_df)
print("Conversion Time Statistics:")
for key, value in stats.items():
    print(f"  {key}: {value:.2f}")
Экспорт в различные форматы
Экспорт в CSV
def export_to_csv(client, date_from, date_to, filename):
    """Экспорт данных в CSV"""
    data = get_raw_data_simple(client, date_from, date_to)
    df = pd.DataFrame(data)
    df.to_csv(filename, index=False, encoding='utf-8')
    print(f"Data exported to {filename}")

# Использование
export_to_csv(client, "2024-01-01", "2024-01-31", "raw_data.csv")
Экспорт в Parquet
def export_to_parquet(client, date_from, date_to, filename):
    """Экспорт данных в Parquet (сжатый формат)"""
    data = get_raw_data_by_campaigns(client, date_from, date_to)
    df = pd.DataFrame(data)

    # Конвертация типов для оптимизации
    df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])

    df.to_parquet(filename, compression='snappy', index=False)
    print(f"Data exported to {filename}")

# Использование
export_to_parquet(client, "2024-01-01", "2024-01-31", "raw_data.parquet")
Экспорт в Excel с несколькими листами
def export_to_excel(client, date_from, date_to, filename):
    """Экспорт различных отчетов в Excel"""
    with pd.ExcelWriter(filename, engine='openpyxl') as writer:
        # Лист 1: Campaign Performance
        campaign_data = get_campaign_performance(client, date_from, date_to)
        pd.DataFrame(campaign_data).to_excel(
            writer, sheet_name='Campaign Performance', index=False
        )

        # Лист 2: E-commerce
        ecom_data = get_ecommerce_report(client, date_from, date_to)
        ecom_data.to_excel(writer, sheet_name='E-commerce', index=False)

        # Лист 3: Campaigns List
        campaigns = get_campaigns(client, active=True)
        pd.DataFrame(campaigns).to_excel(
            writer, sheet_name='Campaigns', index=False
        )

    print(f"Report exported to {filename}")

# Использование
export_to_excel(client, "2024-01-01", "2024-01-31", "report.xlsx")
Интеграция с аналитическими платформами
Загрузка в PostgreSQL
from sqlalchemy import create_engine

def load_to_postgres(client, date_from, date_to, connection_string, table_name):
    """Загрузка данных в PostgreSQL"""
    # Получение данных
    data = get_raw_data_by_campaigns(client, date_from, date_to)
    df = pd.DataFrame(data)

    # Конвертация типов
    df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])

    # Создание подключения
    engine = create_engine(connection_string)

    # Загрузка данных
    df.to_sql(
        table_name,
        engine,
        if_exists='append',  # 'replace' для перезаписи
        index=False,
        chunksize=10000
    )

    print(f"Loaded {len(df)} records to {table_name}")

# Использование
load_to_postgres(
    client,
    "2024-01-01",
    "2024-01-31",
    "postgresql://user:password@localhost:5432/analytics",
    "targetads_raw_data"
)
Загрузка в Google BigQuery
from google.cloud import bigquery

def load_to_bigquery(client, date_from, date_to, project_id, dataset_id, table_id):
    """Загрузка данных в BigQuery"""
    # Получение данных
    data = get_raw_data_by_campaigns(client, date_from, date_to)
    df = pd.DataFrame(data)

    # Конвертация типов
    df['InteractionTime'] = pd.to_datetime(df['InteractionTime'])

    # BigQuery клиент
    bq_client = bigquery.Client(project=project_id)
    table_ref = f"{project_id}.{dataset_id}.{table_id}"

    # Настройка загрузки
    job_config = bigquery.LoadJobConfig(
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        schema_update_options=[
            bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION
        ]
    )

    # Загрузка
    job = bq_client.load_table_from_dataframe(
        df, table_ref, job_config=job_config
    )
    job.result()

    print(f"Loaded {len(df)} records to {table_ref}")

# Использование
load_to_bigquery(
    client,
    "2024-01-01",
    "2024-01-31",
    "my-project",
    "analytics",
    "targetads_raw_data"
)
Интеграция с Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

def extract_targetads_data(**context):
    """Извлечение данных из TargetAds"""
    execution_date = context['execution_date']
    date_str = execution_date.strftime('%Y-%m-%d')

    client = TargetAdsClient()
    data = get_raw_data_by_campaigns(client, date_str, date_str)

    # Сохранение в промежуточное хранилище
    df = pd.DataFrame(data)
    filename = f"/tmp/targetads_{date_str}.parquet"
    df.to_parquet(filename)

    return filename

def load_to_warehouse(**context):
    """Загрузка данных в хранилище"""
    ti = context['ti']
    filename = ti.xcom_pull(task_ids='extract_data')

    # Загрузка из файла
    df = pd.read_parquet(filename)

    # Загрузка в warehouse
    engine = create_engine(os.getenv('WAREHOUSE_CONNECTION'))
    df.to_sql('targetads_raw_data', engine, if_exists='append', index=False)

    print(f"Loaded {len(df)} records")

# DAG
dag = DAG(
    'targetads_daily_extract',
    default_args=default_args,
    description='Daily extract from TargetAds API',
    schedule_interval='0 2 * * *',  # Каждый день в 2:00
    catchup=False
)

extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_targetads_data,
    dag=dag
)

load_task = PythonOperator(
    task_id='load_to_warehouse',
    python_callable=load_to_warehouse,
    dag=dag
)

extract_task >> load_task
Обработка ошибок и retry логика
Retry с экспоненциальной задержкой
import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """Декоратор для retry с экспоненциальной задержкой"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)

                except requests.exceptions.HTTPError as e:
                    if e.response.status_code == 429:
                        # Rate limit - ждем 60 секунд
                        print(f"Rate limit hit, waiting 60 seconds...")
                        time.sleep(60)
                        continue

                    elif e.response.status_code >= 500:
                        # Server error - retry с backoff
                        delay = min(base_delay * (2 ** attempt), max_delay)
                        print(f"Server error, retrying in {delay}s... (attempt {attempt + 1}/{max_retries})")
                        time.sleep(delay)
                        continue

                    else:
                        # Client error - не retry
                        raise

                except requests.exceptions.RequestException as e:
                    if attempt == max_retries - 1:
                        raise

                    delay = min(base_delay * (2 ** attempt), max_delay)
                    print(f"Request failed, retrying in {delay}s... (attempt {attempt + 1}/{max_retries})")
                    time.sleep(delay)

            raise Exception(f"Max retries ({max_retries}) exceeded")

        return wrapper
    return decorator

# Использование
class RobustTargetAdsClient(TargetAdsClient):
    @retry_with_backoff(max_retries=3)
    def post(self, endpoint, params=None, json=None):
        return super().post(endpoint, params, json)

    @retry_with_backoff(max_retries=3)
    def get(self, endpoint, params=None):
        return super().get(endpoint, params)
Обработка специфических ошибок
class TargetAdsAPIError(Exception):
    """Базовое исключение для API ошибок"""
    pass

class AuthenticationError(TargetAdsAPIError):
    """Ошибка аутентификации"""
    pass

class ValidationError(TargetAdsAPIError):
    """Ошибка валидации параметров"""
    pass

class RateLimitError(TargetAdsAPIError):
    """Превышен лимит запросов"""
    pass

def handle_api_error(response):
    """Обработка ошибок API с детальной информацией"""
    try:
        error_data = response.json()
    except:
        error_data = {}

    if response.status_code == 401:
        raise AuthenticationError(
            f"Authentication failed: {error_data.get('ErrorMessage', 'Invalid token')}"
        )

    elif response.status_code == 400:
        errors = error_data.get('Errors', [])
        error_messages = [
            f"{e['FailedField']}: {e['Tag']}"
            for e in errors if isinstance(e, dict)
        ]
        raise ValidationError(
            f"Validation error: {', '.join(error_messages) or error_data.get('ErrorMessage')}"
        )

    elif response.status_code == 429:
        raise RateLimitError("Rate limit exceeded. Please wait before making more requests.")

    elif response.status_code >= 500:
        raise TargetAdsAPIError(f"Server error: {response.status_code}")

    response.raise_for_status()

# Использование в клиенте
class SafeTargetAdsClient(TargetAdsClient):
    def _request(self, method, endpoint, params=None, json=None):
        url = f"{self.api_url}{endpoint}"
        response = requests.request(
            method=method,
            url=url,
            params=params,
            json=json,
            headers=self.headers,
            timeout=300
        )

        if not response.ok:
            handle_api_error(response)

        return response.json()
Логирование и мониторинг
import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('targetads_api.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('TargetAdsAPI')

class MonitoredTargetAdsClient(TargetAdsClient):
    def __init__(self):
        super().__init__()
        self.request_count = 0
        self.error_count = 0

    def _request(self, method, endpoint, params=None, json=None):
        self.request_count += 1
        start_time = datetime.now()

        try:
            logger.info(f"Request #{self.request_count}: {method} {endpoint}")

            response = super()._request(method, endpoint, params, json)

            duration = (datetime.now() - start_time).total_seconds()
            logger.info(f"Request completed in {duration:.2f}s")

            return response

        except Exception as e:
            self.error_count += 1
            duration = (datetime.now() - start_time).total_seconds()

            logger.error(
                f"Request failed after {duration:.2f}s: {str(e)}",
                exc_info=True
            )
            raise

    def get_stats(self):
        """Получить статистику использования API"""
        return {
            'total_requests': self.request_count,
            'errors': self.error_count,
            'success_rate': (
                (self.request_count - self.error_count) / self.request_count * 100
                if self.request_count > 0 else 0
            )
        }
Полный пример: ETL пайплайн
import sys

def daily_etl_pipeline(date):
    """Полный ETL пайплайн для одного дня"""
    logger.info(f"Starting ETL pipeline for {date}")

    # 1. Инициализация клиента
    client = MonitoredTargetAdsClient()

    try:
        # 2. Валидация доступа
        logger.info("Validating API access...")
        if not check_api_access(client):
            raise Exception("API access validation failed")

        # 3. Извлечение данных
        logger.info("Extracting raw data...")
        raw_data = get_raw_data_by_campaigns(client, date, date)
        logger.info(f"Extracted {len(raw_data)} raw records")

        # 4. Извлечение агрегированных данных
        logger.info("Extracting aggregated data...")
        agg_data = get_campaign_performance(client, date, date)
        logger.info(f"Extracted {len(agg_data)} aggregated records")

        # 5. Трансформация
        logger.info("Transforming data...")
        raw_df = pd.DataFrame(raw_data)
        agg_df = pd.DataFrame(agg_data)

        # Конвертация типов
        raw_df['InteractionTime'] = pd.to_datetime(raw_df['InteractionTime'])
        for col in ['Impressions', 'Clicks']:
            if col in agg_df.columns:
                agg_df[col] = agg_df[col].astype(int)

        # 6. Загрузка в хранилище
        logger.info("Loading to warehouse...")
        engine = create_engine(os.getenv('WAREHOUSE_CONNECTION'))

        raw_df.to_sql('targetads_raw_data', engine, if_exists='append', index=False)
        agg_df.to_sql('targetads_agg_data', engine, if_exists='append', index=False)

        # 7. Статистика
        stats = client.get_stats()
        logger.info(f"Pipeline completed. Stats: {stats}")

        return True

    except Exception as e:
        logger.error(f"Pipeline failed: {str(e)}", exc_info=True)
        return False

# Запуск
if __name__ == "__main__":
    date = sys.argv[1] if len(sys.argv) > 1 else datetime.now().strftime('%Y-%m-%d')
    success = daily_etl_pipeline(date)
    sys.exit(0 if success else 1)