Tutorials
- For data engineers
- For platform engineers
Develop
- Overview
- Write and run workflows
- Configure runtime behavior
- Manage state and configuration
- Manage concurrency
- Manage settings
- Test workflows
Deploy
- Overview
- Run flows in local processes
- Run flows on dynamic infrastructure
- Dynamic infrastructure examples
- Static infrastructure examples
- Daemonize worker processes
Automate
Operate
- Overview
- Prefect Cloud
- Prefect server
Extract data from websites
Use Prefect to fetch and analyze large amounts of data
In the Build a data pipeline tutorial, you learned how to create resilient and performant data pipelines. Now you’ll learn how to handle data dependencies and ingest large amounts of data by building a GitHub issue analysis pipeline.
The real world can present additional challenges when dealing with web data:
- API requests can fail or give a response with missing or malformed data.
- You need to make multiple dependent API calls.
- You need to ingest data when you don’t know in advance how much data is available.
Set up error handling
Throw and catch errors to handle them gracefully. For example, if you don’t get a 2xx response from an API, throw an exception and log the error.
from typing import Optional
from prefect import task
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status() # Raise an exception if the response is not a 2xx status code
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
Run the following code to see error handling in action:
from typing import List, Optional
import httpx
from prefect import flow, task
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
fetch_page_of_issues(repo)
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status() # Raise an exception if the response is not a 2xx status code
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"this-repo-does-not-exist/404" # This repo will trigger an error
])
Ingest large amounts of data
Use pagination to fetch large amounts of data and run tasks concurrently to analyze the data efficiently:
from typing import List
from prefect import flow
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
all_issues = []
for repo in repos:
for page in range(1, 3): # Get first 2 pages
issues = fetch_page_of_issues(repo, page)
if not issues:
break
all_issues.extend(issues)
# Run issue analysis tasks concurrently
for issue in all_issues:
analyze_issue.submit(issue) # Submit each task to a task runner
# Wait for all analysis tasks to complete
for detail in issue_details:
result = detail.result() # Block until the task has completed
print(f"Analyzed issue #{result['number']}")
Run the following code to see pagination and concurrent tasks in action:
from typing import List, Optional
import httpx
from prefect import flow, task
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
fetch_repo_issues(repo)
@flow
def fetch_repo_issues(repo: str):
"""Fetch all issues for a single repository"""
all_issues = []
page = 1
for page in range(1, 3): # Limit to 2 pages to avoid hitting rate limits
issues = fetch_page_of_issues(repo, page)
if not issues or len(issues) == 0:
break
all_issues.extend(issues)
page += 1
issue_details = []
for issue in all_issues[:5]: # Limit to 5 issues to avoid hitting rate limits
issue_details.append(
fetch_issue_details.submit(repo, issue['number']) # Submit each task to a task runner
)
details = []
for issue in issue_details:
details.append(issue.result())
return details
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
@task
def fetch_issue_details(repo: str, issue_number: int) -> dict:
"""Fetch detailed information about a specific issue"""
response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
issue_data = response.json()
return issue_data
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Structure your code with dependent nested flows and tasks
Use nested flows and tasks to help distribute tasks more efficiently and aid with debugging.
- Use nested flows for more complex operations that involve multiple steps.
- Use tasks for simpler, atomic operations.
Here’s an example of how to use nested flows and tasks:
from typing import List
from prefect import flow, task
@flow
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
resolution_rate = calculate_resolution_rate(issues)
# ...
@flow
def fetch_repo_issues(repo: str):
"""Nested flow: Fetch all data for a single repository"""
# ...
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Task: Calculate the percentage of closed issues"""
# ...
Run the following code to see metrics calculation in action:
from typing import List, Optional
import httpx
from prefect import flow, task
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
resolution_rate = calculate_resolution_rate(issues)
print(f"Resolution rate: {resolution_rate:.1f}%")
@flow
def fetch_repo_issues(repo: str):
"""Fetch all issues for a single repository"""
all_issues = []
page = 1
for page in range(1, 3): # Limit to 2 pages to avoid hitting rate limits
issues = fetch_page_of_issues(repo, page)
if not issues or len(issues) == 0:
break
all_issues.extend(issues)
page += 1
issue_details = []
for issue in all_issues[:5]: # Limit to 5 issues to avoid hitting rate limits
issue_details.append(
fetch_issue_details.submit(repo, issue['number'])
)
details = []
for issue in issue_details:
details.append(issue.result())
return details
@task(log_prints=True)
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
@task
def fetch_issue_details(repo: str, issue_number: int) -> dict:
"""Fetch detailed information about a specific issue"""
response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
issue_data = response.json()
return issue_data
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Calculate the percentage of closed issues"""
if not issues:
return 0
closed = sum(1 for issue in issues if issue['state'] == 'closed')
return (closed / len(issues)) * 100
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Put it all together
Here’s the complete flow that combines all of these components. We’ll also add retries, caching, and rate limiting to make the workflow more robust.
from datetime import timedelta, datetime
from statistics import mean
from typing import List, Optional
import httpx
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.concurrency.sync import rate_limit
@flow(log_prints=True)
def analyze_repo_health(repos: List[str]):
"""Analyze issue health metrics for GitHub repositories"""
for repo in repos:
print(f"Analyzing {repo}...")
# Fetch and analyze all issues
issues = fetch_repo_issues(repo)
# Calculate metrics
avg_response_time = calculate_response_times(issues)
resolution_rate = calculate_resolution_rate(issues)
print(f"Average response time: {avg_response_time:.1f} hours")
print(f"Resolution rate: {resolution_rate:.1f}%")
@flow
def fetch_repo_issues(repo: str):
"""Fetch all issues for a single repository"""
all_issues = []
page = 1
for page in range(1, 3): # Limit to 2 pages to avoid hitting rate limits
issues = fetch_page_of_issues(repo, page)
if not issues or len(issues) == 0:
break
all_issues.extend(issues)
page += 1
issue_details = []
for issue in all_issues[:5]: # Limit to 5 issues to avoid hitting rate limits
issue_details.append(
fetch_issue_details.submit(repo, issue['number'])
)
details = []
for issue in issue_details:
details.append(issue.result())
return details
@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_page_of_issues(repo: str, page: int = 1) -> Optional[dict]:
"""Fetch a page of issues for a GitHub repository"""
rate_limit("github-api")
try:
response = httpx.get(
f"https://api.github.com/repos/{repo}/issues",
params={"page": page, "state": "all", "per_page": 100}
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching issues for {repo}: {e}")
return None
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_issue_details(repo: str, issue_number: int) -> dict:
"""Fetch detailed information about a specific issue"""
rate_limit("github-api")
response = httpx.get(f"https://api.github.com/repos/{repo}/issues/{issue_number}")
issue_data = response.json()
# Fetch comments for the issue
comments = fetch_comments(issue_data['comments_url'])
issue_data['comments_data'] = comments
return issue_data
@task(log_prints=True, retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_comments(comments_url: str) -> List[dict]:
"""Fetch comments for an issue"""
rate_limit("github-api")
try:
response = httpx.get(comments_url)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching comments: {e}")
return []
@task
def calculate_response_times(issues: List[dict]) -> float:
"""Calculate average time to first response for issues"""
response_times = []
for issue in issues:
comments_data = issue.get('comments_data', [])
if comments_data: # If there are comments
created = datetime.fromisoformat(issue['created_at'].replace('Z', '+00:00'))
first_comment = datetime.fromisoformat(
comments_data[0]['created_at'].replace('Z', '+00:00')
)
response_time = (first_comment - created).total_seconds() / 3600
response_times.append(response_time)
return mean(response_times) if response_times else 0
@task
def calculate_resolution_rate(issues: List[dict]) -> float:
"""Calculate the percentage of closed issues"""
if not issues:
return 0
closed = sum(1 for issue in issues if issue['state'] == 'closed')
return (closed / len(issues)) * 100
if __name__ == "__main__":
analyze_repo_health([
"PrefectHQ/prefect",
"pydantic/pydantic",
"huggingface/transformers"
])
Before running this code, make sure to set up the GitHub API rate limit:
# GitHub has a rate limit of 60 unauthenticated requests per hour (~0.016 requests per second)
prefect gcl create github-api --limit 60 --slot-decay-per-second 0.016
Run your analysis:
python repo_analysis.py
The output should look something like this:
10:59:13.933 | INFO | prefect.engine - Created flow run 'robust-kangaroo' for flow 'analyze-repo-health'
10:59:13.934 | INFO | prefect.engine - View at http://127.0.0.1:4200/runs/flow-run/abdf7f46-6d59-4857-99cd-9e265cadc4a7
10:59:13.954 | INFO | Flow run 'robust-kangaroo' - Analyzing PrefectHQ/prefect...
...
10:59:27.631 | INFO | Flow run 'robust-kangaroo' - Average response time: 0.4 hours
10:59:27.631 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 40.0%
10:59:27.632 | INFO | Flow run 'robust-kangaroo' - Analyzing pydantic/pydantic...
...
10:59:40.990 | INFO | Flow run 'robust-kangaroo' - Average response time: 0.0 hours
10:59:40.991 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:40.991 | INFO | Flow run 'robust-kangaroo' - Analyzing huggingface/transformers...
...
10:59:54.225 | INFO | Flow run 'robust-kangaroo' - Average response time: 1.1 hours
10:59:54.225 | INFO | Flow run 'robust-kangaroo' - Resolution rate: 0.0%
10:59:54.240 | INFO | Flow run 'robust-kangaroo' - Finished in state Completed()
Next steps
In this tutorial, you built a complex data extraction pipeline which uses the following new techniques:
- Error recovery with try/catch blocks
- Modularized workflows with dependent nested flows and tasks
- Efficient ingestion and processing of large data with pagination and concurrent tasks
Now that you’ve finished this tutorial series, continue your learning journey by going deep on the following topics:
- Write flows and tasks
- Manage Prefect Cloud and server instances
- Run workflows on work pools using Kubernetes, Docker, and serverless infrastructure.
Need help? Book a meeting with a Prefect Product Advocate to get your questions answered.