Python PostgreSQL Connection Guide
This guide covers how to set up a Python environment using uv, manage dependencies, and connect to a PostgreSQL database securely using environment variables.
1. Install uv (Fast Python Package Installer)
uv is an extremely fast Python package installer and resolver. It is designed to save disk space and speed up your workflow by using a global cache for dependencies, meaning you don't need to re-download the same packages for every project.
Installation on Linux (HPC)
On most Linux systems (including HPC environments), you can install uv with a single command:
curl -LsSf https://astral.sh/uv/install.sh | sh
Verify the installation:
uv --version
2. Initialize Project & Install Dependencies
Create a new directory for your project and initialize it.
# Create project directory
mkdir my-research-project
cd my-research-project
# Initialize a new Python project with uv
uv init
uv init vs uv venv
uv init: Creates a new project structure (including apyproject.tomlfile). It sets up a modern Python project managed byuv, allowing you to declare dependencies that are reproducible across different environments. This is recommended for new projects.uv venv: Just creates a virtual environment in the current directory (similar topython -m venv). It doesn't create a project file (pyproject.toml) or manage dependencies for you. Use this if you just want a quick, throwaway environment or are managing a legacy project without apyproject.toml.
Add the necessary libraries for PostgreSQL connection and environment variable management. We will use psycopg2-binary for the database driver and python-dotenv to load credentials.
# Add dependencies
uv add psycopg2-binary python-dotenv pandas
This command creates a virtual environment (if one doesn't exist), installs the packages, and updates your pyproject.toml file.
3. Secure Database Credentials
Never hardcode your database passwords in your code. Use a .env file instead.
- Create a file named
.envin your project root:
touch .env
- Add your database credentials to the
.envfile:
# .env file
DB_HOST=your_server_host
DB_PORT=5432
DB_NAME=your_database_name
DB_USER=your_username
DB_PASSWORD=your_secure_password
- Important: specific to git users, add
.envto your.gitignorefile to prevent it from being committed to version control.
echo ".env" >> .gitignore
4. Connect to PostgreSQL
Here is a complete Python script (main.py) that loads the credentials and connects to the database.
import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv
# 1. Load environment variables from .env file
load_dotenv()
def get_db_connection():
"""Establishes a connection to the PostgreSQL database."""
try:
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
return conn
except Exception as e:
print(f"Error connecting to database: {e}")
return None
def main():
# 2. Connect to the database
conn = get_db_connection()
if conn:
print("Successfully connected to the database!")
try:
# 3. Example Query: Fetch version
cursor = conn.cursor()
cursor.execute("SELECT version();")
db_version = cursor.fetchone()
print(f"Database version: {db_version[0]}")
# 4. Example: Fetch data into a Pandas DataFrame
# Remember to use the 'research' schema
query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'research'
LIMIT 5;
"""
df = pd.read_sql_query(query, conn)
print("\nFirst 5 tables in 'research' schema:")
print(df)
except Exception as e:
print(f"Error executing query: {e}")
finally:
# 5. Close the connection
conn.close()
print("Connection closed.")
if __name__ == "__main__":
main()
5. Run the Script
Run your script using uv:
uv run main.py
uv run automatically ensures the environment is set up and dependencies are available before running the script.
PostgreSQL Connection Examples
Basic PostgreSQL Connection
import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Connect to PostgreSQL database
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
# Test basic connection
cursor = conn.cursor()
cursor.execute("SELECT current_database(), current_user;")
result = cursor.fetchone()
print(f"Database: {result[0]}, User: {result[1]}")
# CRITICAL: Research data is in 'research' schema
# List available tables in research schema
tables_query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'research'
ORDER BY table_name
"""
tables = pd.read_sql_query(tables_query, conn)
print("\nAvailable research tables:")
print(tables)
# Quick research data check
person_count_query = """
SELECT
COUNT(*) as total_people,
SUM(CASE WHEN adrc_cohort = 1 THEN 1 ELSE 0 END) as adrc_cohort_count
FROM research.person
"""
person_count = pd.read_sql_query(person_count_query, conn)
print("\nPerson counts:")
print(person_count)
# Close connection
conn.close()
Python PostgreSQL Connection for Research Database
import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Connect to PostgreSQL research database
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
# Test connection
cursor = conn.cursor()
cursor.execute("SELECT version();")
db_version = cursor.fetchone()
print(f"Database version: {db_version[0]}")
# IMPORTANT: Research data is in the 'research' schema
# Always use 'research.table_name' unless you've configured a search path
# Example 1: Get ADRC cohort demographics
demographics_query = """
SELECT
person_id,
gender_source_value,
race_source_value,
ethnicity_source_value,
adrc_cohort,
source_system
FROM research.person
WHERE adrc_cohort = 1
LIMIT 10
"""
demographics = pd.read_sql_query(demographics_query, conn)
print("\nADRC Cohort Demographics:")
print(demographics)
# Example 2: Get condition names with concepts
conditions_query = """
SELECT
co.person_id,
co.age_at_condition_start,
c.concept_name,
c.vocabulary_id,
co.condition_source_value
FROM research.condition_occurrence co
JOIN research.concept c ON co.condition_concept_id = c.concept_id
WHERE co.age_at_condition_start >= 65
LIMIT 10
"""
conditions = pd.read_sql_query(conditions_query, conn)
print("\nConditions:")
print(conditions)
# Close connection
conn.close()
Alternative with psycopg2 Connection Pooling
import os
import psycopg2
from psycopg2 import pool
import pandas as pd
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Create connection pool for better performance with large datasets
connection_pool = psycopg2.pool.SimpleConnectionPool(
1, # minimum connections
5, # maximum connections
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
# Get connection from pool
conn = connection_pool.getconn()
# Query research data with better performance for large datasets
recent_conditions_query = """
SELECT
co.person_id,
co.age_at_condition_start,
c.concept_name,
c.domain_id
FROM research.condition_occurrence co
JOIN research.concept c ON co.condition_concept_id = c.concept_id
WHERE co.age_at_condition_start >= 70
ORDER BY co.age_at_condition_start DESC
LIMIT 100
"""
recent_conditions = pd.read_sql_query(recent_conditions_query, conn)
print(recent_conditions.head())
# Return connection to pool
connection_pool.putconn(conn)
# Close all connections in pool when done
connection_pool.closeall()
Research Database Query Examples
Example 1: Patient Demographics Analysis
import pandas as pd
import psycopg2
from dotenv import load_dotenv
load_dotenv()
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
# ADRC cohort demographics with condition counts
demographics_summary_query = """
SELECT
p.gender_source_value,
p.race_source_value,
COUNT(DISTINCT p.person_id) as patient_count,
COUNT(DISTINCT co.condition_occurrence_id) as total_conditions,
ROUND(AVG(co.age_at_condition_start), 1) as avg_age_at_first_condition
FROM research.person p
LEFT JOIN research.condition_occurrence co ON p.person_id = co.person_id
WHERE p.adrc_cohort = 1
GROUP BY p.gender_source_value, p.race_source_value
ORDER BY patient_count DESC
"""
demographics_summary = pd.read_sql_query(demographics_summary_query, conn)
print(demographics_summary)
conn.close()
Example 2: Medication Analysis
# Most common medications in ADRC cohort
common_meds_query = """
SELECT
c.concept_name as medication_name,
c.vocabulary_id,
COUNT(DISTINCT de.person_id) as patient_count,
COUNT(*) as total_exposures,
ROUND(AVG(de.age_at_drug_start), 1) as avg_start_age
FROM research.drug_exposure de
JOIN research.concept c ON de.drug_concept_id = c.concept_id
JOIN research.person p ON de.person_id = p.person_id
WHERE p.adrc_cohort = 1
AND c.domain_id = 'Drug'
AND c.vocabulary_id IN ('RxNorm', 'NDC')
GROUP BY c.concept_name, c.vocabulary_id
HAVING COUNT(DISTINCT de.person_id) >= 10
ORDER BY patient_count DESC
LIMIT 20
"""
common_meds = pd.read_sql_query(common_meds_query, conn)
print(common_meds)
Example 3: Lab Values Analysis
# Recent lab values with normal ranges
lab_values_query = """
SELECT
c.concept_name as lab_name,
m.value_as_number,
u.concept_name as unit,
m.range_low,
m.range_high,
CASE
WHEN m.value_as_number < m.range_low THEN 'Low'
WHEN m.value_as_number > m.range_high THEN 'High'
ELSE 'Normal'
END as result_flag,
m.age_at_measurement
FROM research.measurement m
JOIN research.concept c ON m.measurement_concept_id = c.concept_id
LEFT JOIN research.concept u ON m.unit_concept_id = u.concept_id
JOIN research.person p ON m.person_id = p.person_id
WHERE p.adrc_cohort = 1
AND m.value_as_number IS NOT NULL
AND m.age_at_measurement >= 65
AND c.concept_name ILIKE '%cholesterol%'
ORDER BY m.age_at_measurement DESC
LIMIT 100
"""
lab_values = pd.read_sql_query(lab_values_query, conn)
print(lab_values)
Example 4: Temporal Analysis
# Age progression of conditions
condition_timeline_query = """
SELECT
co.person_id,
c.concept_name as condition_name,
co.age_at_condition_start,
CASE
WHEN co.age_at_condition_start < 65 THEN 'Under 65'
WHEN co.age_at_condition_start BETWEEN 65 AND 74 THEN '65-74'
WHEN co.age_at_condition_start BETWEEN 75 AND 84 THEN '75-84'
ELSE '85+'
END as age_group
FROM research.condition_occurrence co
JOIN research.concept c ON co.condition_concept_id = c.concept_id
JOIN research.person p ON co.person_id = p.person_id
WHERE p.adrc_cohort = 1
AND c.domain_id = 'Condition'
AND c.concept_name ILIKE '%dementia%'
ORDER BY co.person_id, co.age_at_condition_start
"""
condition_timeline = pd.read_sql_query(condition_timeline_query, conn)
print(condition_timeline)
Example 5: Mortality Analysis
# Survival analysis data
survival_data_query = """
SELECT
p.person_id,
p.gender_source_value,
d.age_at_death,
dc.concept_name as cause_of_death,
CASE WHEN d.person_id IS NOT NULL THEN 1 ELSE 0 END as deceased
FROM research.person p
LEFT JOIN research.death d ON p.person_id = d.person_id
LEFT JOIN research.concept dc ON d.cause_concept_id = dc.concept_id
WHERE p.adrc_cohort = 1
ORDER BY p.person_id
"""
survival_data = pd.read_sql_query(survival_data_query, conn)
print(survival_data)
Schema Configuration (Optional)
# Set search path to avoid typing 'research.' every time
cursor = conn.cursor()
cursor.execute("SET search_path TO research, public;")
conn.commit()
# Now you can query without schema prefix
simple_query = """
SELECT COUNT(*) as total_patients
FROM person
WHERE adrc_cohort = 1
"""
simple_result = pd.read_sql_query(simple_query, conn)
print(simple_result)
# NOTE: This setting only lasts for your current connection
# You'll need to set it again each time you reconnect
Important Notes for Research Schema
Schema Prefix Requirements
- Always use
research.table_nameunless you've set the search path - Available tables:
person,condition_occurrence,drug_exposure,measurement,observation,procedure_occurrence,visit_occurrence,death,concept - The
concepttable is essential for translating concept IDs to readable names
Key Fields to Remember
- Age fields: All temporal data uses
age_at_*instead of dates - ADRC cohort: Filter with
adrc_cohort = 1for main research population - Concept joins: Always join to
research.conceptto get readable names - Standard concepts: Use
standard_concept = 'S'for preferred terminology
Secure Connection Management
Using Environment Variables (Already Covered)
The .env file approach shown earlier is the recommended method for secure credential management.
Using Configuration Files
import os
import psycopg2
import getpass
from dotenv import load_dotenv
# Option 1: Load from .env file (recommended)
load_dotenv()
# Option 2: Create a config dictionary (without password)
db_config = {
"host": os.getenv("DB_HOST", "your_server_host"),
"port": 5432,
"database": os.getenv("DB_NAME", "your_database_name"),
"user": os.getenv("DB_USER", "your_username")
}
# Prompt for password securely
password = getpass.getpass("Database password: ")
# Connect with config and password
conn = psycopg2.connect(
host=db_config["host"],
port=db_config["port"],
database=db_config["database"],
user=db_config["user"],
password=password
)
Common Research Database Operations
Reading Research Data
import pandas as pd
import psycopg2
from dotenv import load_dotenv
load_dotenv()
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
# Read specific research tables
person_data = pd.read_sql_query("SELECT * FROM research.person LIMIT 1000", conn)
# Execute research-specific queries
condition_summary_query = """
SELECT
c.concept_name,
COUNT(DISTINCT co.person_id) as patient_count,
ROUND(AVG(co.age_at_condition_start), 1) as avg_age
FROM research.condition_occurrence co
JOIN research.concept c ON co.condition_concept_id = c.concept_id
JOIN research.person p ON co.person_id = p.person_id
WHERE p.adrc_cohort = 1
GROUP BY c.concept_name
ORDER BY patient_count DESC
"""
condition_summary = pd.read_sql_query(condition_summary_query, conn)
print(condition_summary)
# Read in chunks for large research datasets
chunk_size = 5000
offset = 0
while True:
chunk_query = f"""
SELECT *
FROM research.measurement
WHERE age_at_measurement >= 65
ORDER BY measurement_id
LIMIT {chunk_size} OFFSET {offset}
"""
chunk = pd.read_sql_query(chunk_query, conn)
if chunk.empty:
break
# Process chunk
print(f"Processed {len(chunk)} measurements")
# Analyze chunk here
# ...
offset += chunk_size
conn.close()
Research Data Analysis Patterns
# Note: Research schema is read-only for analysis
# No INSERT/UPDATE/DELETE operations allowed
# Common pattern: Join with concept table for readable names
readable_data_query = """
SELECT
de.person_id,
de.age_at_drug_start,
c.concept_name as drug_name,
de.days_supply
FROM research.drug_exposure de
JOIN research.concept c ON de.drug_concept_id = c.concept_id
WHERE de.age_at_drug_start >= 60
"""
readable_data = pd.read_sql_query(readable_data_query, conn)
print(readable_data)
Performance Tips for Research Data
Chunked Reading for Large Tables
def process_large_table(conn, table_name, chunk_size=10000):
"""
Process large research tables in chunks to avoid memory issues.
Args:
conn: Database connection
table_name: Name of the table to process
chunk_size: Number of rows to process at a time
"""
offset = 0
while True:
# Query with LIMIT and OFFSET for chunking
query = f"""
SELECT *
FROM research.{table_name}
WHERE adrc_cohort = 1
ORDER BY person_id
LIMIT {chunk_size} OFFSET {offset}
"""
chunk = pd.read_sql_query(query, conn)
if chunk.empty:
break
# Process your chunk here
print(f"Processed {len(chunk)} rows")
# Example: Analyze chunk
# analyze_chunk(chunk)
offset += chunk_size
# Usage
process_large_table(conn, "measurement", chunk_size=5000)
Optimized Research Queries
import time
# Use indexes efficiently - filter by person_id first
efficient_query = """
SELECT co.*, c.concept_name
FROM research.condition_occurrence co
JOIN research.concept c ON co.condition_concept_id = c.concept_id
WHERE co.person_id IN (
SELECT person_id FROM research.person WHERE adrc_cohort = 1
)
AND co.age_at_condition_start >= 65
"""
# Measure query performance
start_time = time.time()
result = pd.read_sql_query(efficient_query, conn)
elapsed_time = time.time() - start_time
print(f"Query executed in {elapsed_time:.2f} seconds")
print(f"Retrieved {len(result)} rows")
# Count query for quick checks
count_query = "SELECT COUNT(*) FROM research.condition_occurrence"
start_time = time.time()
count_result = pd.read_sql_query(count_query, conn)
elapsed_time = time.time() - start_time
print(f"Count query executed in {elapsed_time:.2f} seconds")
print(f"Total rows: {count_result.iloc[0, 0]}")
Troubleshooting Common Issues
Package Installation Problems
# Check if PostgreSQL client libraries are installed
dpkg -l | grep libpq-dev
# Only install if missing (no output from above command)
sudo apt install libpq-dev build-essential
# If psycopg2 fails to install, try installing from source
pip install psycopg2-binary
Note: The libpq-dev package contains PostgreSQL client libraries that Python packages need to compile and connect to PostgreSQL databases. Many systems already have these installed. If your Python package installation succeeds without errors, you likely don't need to install additional system dependencies.
Connection Problems
import psycopg2
from dotenv import load_dotenv
import os
load_dotenv()
# Test connection with error handling
try:
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
# Test basic query
cursor = conn.cursor()
cursor.execute("SELECT current_user, current_database()")
result = cursor.fetchone()
print("Connection successful!")
print(f"User: {result[0]}, Database: {result[1]}")
conn.close()
except psycopg2.Error as e:
print(f"Connection failed: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Research Schema Access Issues
import pandas as pd
# Verify research schema exists and you have access
schema_check_query = """
SELECT
schemaname,
tablename
FROM pg_tables
WHERE schemaname = 'research'
ORDER BY tablename
"""
schema_check = pd.read_sql_query(schema_check_query, conn)
if len(schema_check) == 0:
print("Research schema not found or no access")
else:
print("Available research tables:")
print(schema_check['tablename'].tolist())
Best Practices for Research Data
- Always close connections: Use
conn.close()when finished or use context managers - Filter by ADRC cohort: Use
WHERE adrc_cohort = 1for main research population - Use schema prefix: Always specify
research.table_name - Join with concepts: Always join to
research.conceptfor readable names - Handle large datasets: Use chunked reading for measurements and observations
- Monitor performance: Use
time.time()for query optimization - Use context managers: Ensure connections are properly closed even if errors occur
import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv
from contextlib import contextmanager
load_dotenv()
@contextmanager
def get_db_connection():
"""
Context manager for database connections.
Ensures connections are properly closed even if errors occur.
"""
conn = None
try:
conn = psycopg2.connect(
host=os.getenv("DB_HOST"),
database=os.getenv("DB_NAME"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
port=os.getenv("DB_PORT", "5432")
)
yield conn
except psycopg2.Error as e:
print(f"Database error: {e}")
raise
finally:
if conn:
conn.close()
def research_analysis():
"""
Example research query with best practices.
"""
try:
with get_db_connection() as conn:
# Filter ADRC cohort and join with concepts
query = """
SELECT
p.person_id,
p.gender_source_value,
co.age_at_condition_start,
c.concept_name as condition_name
FROM research.person p
JOIN research.condition_occurrence co ON p.person_id = co.person_id
JOIN research.concept c ON co.condition_concept_id = c.concept_id
WHERE p.adrc_cohort = 1
AND co.age_at_condition_start >= 65
ORDER BY co.age_at_condition_start
"""
result = pd.read_sql_query(query, conn)
return result
except Exception as e:
print(f"Query failed: {e}")
return None
# Usage
results = research_analysis()
if results is not None:
print(f"Retrieved {len(results)} records")
print(results.head())
Additional Resources
Python PostgreSQL Packages
- psycopg2 Documentation - PostgreSQL adapter for Python
- pandas.read_sql Documentation - Reading SQL queries into DataFrames
- python-dotenv Documentation - Environment variable management
OMOP and Research Data
- OMOP Common Data Model - Official OMOP CDM documentation
- Athena Vocabulary Browser - Search OMOP concepts and vocabularies
- OHDSI Tools - Additional analysis tools for OMOP data