Skip to content

Python PostgreSQL Connection Guide

This guide covers how to set up a Python environment using uv, manage dependencies, and connect to a PostgreSQL database securely using environment variables.

1. Install uv (Fast Python Package Installer)

uv is an extremely fast Python package installer and resolver. It is designed to save disk space and speed up your workflow by using a global cache for dependencies, meaning you don't need to re-download the same packages for every project.

Installation on Linux (HPC)

On most Linux systems (including HPC environments), you can install uv with a single command:

curl -LsSf https://astral.sh/uv/install.sh | sh

Verify the installation:

uv --version

2. Initialize Project & Install Dependencies

Create a new directory for your project and initialize it.

# Create project directory
mkdir my-research-project
cd my-research-project

# Initialize a new Python project with uv
uv init

uv init vs uv venv

  • uv init: Creates a new project structure (including a pyproject.toml file). It sets up a modern Python project managed by uv, allowing you to declare dependencies that are reproducible across different environments. This is recommended for new projects.
  • uv venv: Just creates a virtual environment in the current directory (similar to python -m venv). It doesn't create a project file (pyproject.toml) or manage dependencies for you. Use this if you just want a quick, throwaway environment or are managing a legacy project without a pyproject.toml.

Add the necessary libraries for PostgreSQL connection and environment variable management. We will use psycopg2-binary for the database driver and python-dotenv to load credentials.

# Add dependencies
uv add psycopg2-binary python-dotenv pandas

This command creates a virtual environment (if one doesn't exist), installs the packages, and updates your pyproject.toml file.

3. Secure Database Credentials

Never hardcode your database passwords in your code. Use a .env file instead.

  1. Create a file named .env in your project root:
touch .env
  1. Add your database credentials to the .env file:
# .env file
DB_HOST=your_server_host
DB_PORT=5432
DB_NAME=your_database_name
DB_USER=your_username
DB_PASSWORD=your_secure_password
  1. Important: specific to git users, add .env to your .gitignore file to prevent it from being committed to version control.
echo ".env" >> .gitignore

4. Connect to PostgreSQL

Here is a complete Python script (main.py) that loads the credentials and connects to the database.

import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv

# 1. Load environment variables from .env file
load_dotenv()

def get_db_connection():
    """Establishes a connection to the PostgreSQL database."""
    try:
        conn = psycopg2.connect(
            host=os.getenv("DB_HOST"),
            database=os.getenv("DB_NAME"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            port=os.getenv("DB_PORT", "5432")
        )
        return conn
    except Exception as e:
        print(f"Error connecting to database: {e}")
        return None

def main():
    # 2. Connect to the database
    conn = get_db_connection()

    if conn:
        print("Successfully connected to the database!")

        try:
            # 3. Example Query: Fetch version
            cursor = conn.cursor()
            cursor.execute("SELECT version();")
            db_version = cursor.fetchone()
            print(f"Database version: {db_version[0]}")

            # 4. Example: Fetch data into a Pandas DataFrame
            # Remember to use the 'research' schema
            query = """
                SELECT table_name 
                FROM information_schema.tables 
                WHERE table_schema = 'research'
                LIMIT 5;
            """
            df = pd.read_sql_query(query, conn)
            print("\nFirst 5 tables in 'research' schema:")
            print(df)

        except Exception as e:
            print(f"Error executing query: {e}")
        finally:
            # 5. Close the connection
            conn.close()
            print("Connection closed.")

if __name__ == "__main__":
    main()

5. Run the Script

Run your script using uv:

uv run main.py

uv run automatically ensures the environment is set up and dependencies are available before running the script.


PostgreSQL Connection Examples

Basic PostgreSQL Connection

import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Connect to PostgreSQL database
conn = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    database=os.getenv("DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    port=os.getenv("DB_PORT", "5432")
)

# Test basic connection
cursor = conn.cursor()
cursor.execute("SELECT current_database(), current_user;")
result = cursor.fetchone()
print(f"Database: {result[0]}, User: {result[1]}")

# CRITICAL: Research data is in 'research' schema
# List available tables in research schema
tables_query = """
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = 'research'
    ORDER BY table_name
"""
tables = pd.read_sql_query(tables_query, conn)
print("\nAvailable research tables:")
print(tables)

# Quick research data check
person_count_query = """
    SELECT 
        COUNT(*) as total_people,
        SUM(CASE WHEN adrc_cohort = 1 THEN 1 ELSE 0 END) as adrc_cohort_count
    FROM research.person
"""
person_count = pd.read_sql_query(person_count_query, conn)
print("\nPerson counts:")
print(person_count)

# Close connection
conn.close()

Python PostgreSQL Connection for Research Database

import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Connect to PostgreSQL research database
conn = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    database=os.getenv("DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    port=os.getenv("DB_PORT", "5432")
)

# Test connection
cursor = conn.cursor()
cursor.execute("SELECT version();")
db_version = cursor.fetchone()
print(f"Database version: {db_version[0]}")

# IMPORTANT: Research data is in the 'research' schema
# Always use 'research.table_name' unless you've configured a search path

# Example 1: Get ADRC cohort demographics
demographics_query = """
    SELECT 
        person_id,
        gender_source_value,
        race_source_value,
        ethnicity_source_value,
        adrc_cohort,
        source_system
    FROM research.person 
    WHERE adrc_cohort = 1
    LIMIT 10
"""
demographics = pd.read_sql_query(demographics_query, conn)
print("\nADRC Cohort Demographics:")
print(demographics)

# Example 2: Get condition names with concepts
conditions_query = """
    SELECT 
        co.person_id,
        co.age_at_condition_start,
        c.concept_name,
        c.vocabulary_id,
        co.condition_source_value
    FROM research.condition_occurrence co
    JOIN research.concept c ON co.condition_concept_id = c.concept_id
    WHERE co.age_at_condition_start >= 65
    LIMIT 10
"""
conditions = pd.read_sql_query(conditions_query, conn)
print("\nConditions:")
print(conditions)

# Close connection
conn.close()

Alternative with psycopg2 Connection Pooling

import os
import psycopg2
from psycopg2 import pool
import pandas as pd
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Create connection pool for better performance with large datasets
connection_pool = psycopg2.pool.SimpleConnectionPool(
    1,  # minimum connections
    5,  # maximum connections
    host=os.getenv("DB_HOST"),
    database=os.getenv("DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    port=os.getenv("DB_PORT", "5432")
)

# Get connection from pool
conn = connection_pool.getconn()

# Query research data with better performance for large datasets
recent_conditions_query = """
    SELECT 
        co.person_id,
        co.age_at_condition_start,
        c.concept_name,
        c.domain_id
    FROM research.condition_occurrence co
    JOIN research.concept c ON co.condition_concept_id = c.concept_id
    WHERE co.age_at_condition_start >= 70
    ORDER BY co.age_at_condition_start DESC
    LIMIT 100
"""
recent_conditions = pd.read_sql_query(recent_conditions_query, conn)
print(recent_conditions.head())

# Return connection to pool
connection_pool.putconn(conn)

# Close all connections in pool when done
connection_pool.closeall()

Research Database Query Examples

Example 1: Patient Demographics Analysis

import pandas as pd
import psycopg2
from dotenv import load_dotenv

load_dotenv()
conn = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    database=os.getenv("DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    port=os.getenv("DB_PORT", "5432")
)

# ADRC cohort demographics with condition counts
demographics_summary_query = """
    SELECT 
        p.gender_source_value,
        p.race_source_value,
        COUNT(DISTINCT p.person_id) as patient_count,
        COUNT(DISTINCT co.condition_occurrence_id) as total_conditions,
        ROUND(AVG(co.age_at_condition_start), 1) as avg_age_at_first_condition
    FROM research.person p
    LEFT JOIN research.condition_occurrence co ON p.person_id = co.person_id
    WHERE p.adrc_cohort = 1
    GROUP BY p.gender_source_value, p.race_source_value
    ORDER BY patient_count DESC
"""

demographics_summary = pd.read_sql_query(demographics_summary_query, conn)
print(demographics_summary)

conn.close()

Example 2: Medication Analysis

# Most common medications in ADRC cohort
common_meds_query = """
    SELECT 
        c.concept_name as medication_name,
        c.vocabulary_id,
        COUNT(DISTINCT de.person_id) as patient_count,
        COUNT(*) as total_exposures,
        ROUND(AVG(de.age_at_drug_start), 1) as avg_start_age
    FROM research.drug_exposure de
    JOIN research.concept c ON de.drug_concept_id = c.concept_id
    JOIN research.person p ON de.person_id = p.person_id
    WHERE p.adrc_cohort = 1
        AND c.domain_id = 'Drug'
        AND c.vocabulary_id IN ('RxNorm', 'NDC')
    GROUP BY c.concept_name, c.vocabulary_id
    HAVING COUNT(DISTINCT de.person_id) >= 10
    ORDER BY patient_count DESC
    LIMIT 20
"""

common_meds = pd.read_sql_query(common_meds_query, conn)
print(common_meds)

Example 3: Lab Values Analysis

# Recent lab values with normal ranges
lab_values_query = """
    SELECT 
        c.concept_name as lab_name,
        m.value_as_number,
        u.concept_name as unit,
        m.range_low,
        m.range_high,
        CASE 
            WHEN m.value_as_number < m.range_low THEN 'Low'
            WHEN m.value_as_number > m.range_high THEN 'High'
            ELSE 'Normal'
        END as result_flag,
        m.age_at_measurement
    FROM research.measurement m
    JOIN research.concept c ON m.measurement_concept_id = c.concept_id
    LEFT JOIN research.concept u ON m.unit_concept_id = u.concept_id
    JOIN research.person p ON m.person_id = p.person_id
    WHERE p.adrc_cohort = 1
        AND m.value_as_number IS NOT NULL
        AND m.age_at_measurement >= 65
        AND c.concept_name ILIKE '%cholesterol%'
    ORDER BY m.age_at_measurement DESC
    LIMIT 100
"""

lab_values = pd.read_sql_query(lab_values_query, conn)
print(lab_values)

Example 4: Temporal Analysis

# Age progression of conditions
condition_timeline_query = """
    SELECT 
        co.person_id,
        c.concept_name as condition_name,
        co.age_at_condition_start,
        CASE 
            WHEN co.age_at_condition_start < 65 THEN 'Under 65'
            WHEN co.age_at_condition_start BETWEEN 65 AND 74 THEN '65-74'
            WHEN co.age_at_condition_start BETWEEN 75 AND 84 THEN '75-84'
            ELSE '85+'
        END as age_group
    FROM research.condition_occurrence co
    JOIN research.concept c ON co.condition_concept_id = c.concept_id
    JOIN research.person p ON co.person_id = p.person_id
    WHERE p.adrc_cohort = 1
        AND c.domain_id = 'Condition'
        AND c.concept_name ILIKE '%dementia%'
    ORDER BY co.person_id, co.age_at_condition_start
"""

condition_timeline = pd.read_sql_query(condition_timeline_query, conn)
print(condition_timeline)

Example 5: Mortality Analysis

# Survival analysis data
survival_data_query = """
    SELECT 
        p.person_id,
        p.gender_source_value,
        d.age_at_death,
        dc.concept_name as cause_of_death,
        CASE WHEN d.person_id IS NOT NULL THEN 1 ELSE 0 END as deceased
    FROM research.person p
    LEFT JOIN research.death d ON p.person_id = d.person_id
    LEFT JOIN research.concept dc ON d.cause_concept_id = dc.concept_id
    WHERE p.adrc_cohort = 1
    ORDER BY p.person_id
"""

survival_data = pd.read_sql_query(survival_data_query, conn)
print(survival_data)

Schema Configuration (Optional)

# Set search path to avoid typing 'research.' every time
cursor = conn.cursor()
cursor.execute("SET search_path TO research, public;")
conn.commit()

# Now you can query without schema prefix
simple_query = """
    SELECT COUNT(*) as total_patients 
    FROM person 
    WHERE adrc_cohort = 1
"""

simple_result = pd.read_sql_query(simple_query, conn)
print(simple_result)

# NOTE: This setting only lasts for your current connection
# You'll need to set it again each time you reconnect

Important Notes for Research Schema

Schema Prefix Requirements

  • Always use research.table_name unless you've set the search path
  • Available tables: person, condition_occurrence, drug_exposure, measurement, observation, procedure_occurrence, visit_occurrence, death, concept
  • The concept table is essential for translating concept IDs to readable names

Key Fields to Remember

  • Age fields: All temporal data uses age_at_* instead of dates
  • ADRC cohort: Filter with adrc_cohort = 1 for main research population
  • Concept joins: Always join to research.concept to get readable names
  • Standard concepts: Use standard_concept = 'S' for preferred terminology

Secure Connection Management

Using Environment Variables (Already Covered)

The .env file approach shown earlier is the recommended method for secure credential management.

Using Configuration Files

import os
import psycopg2
import getpass
from dotenv import load_dotenv

# Option 1: Load from .env file (recommended)
load_dotenv()

# Option 2: Create a config dictionary (without password)
db_config = {
    "host": os.getenv("DB_HOST", "your_server_host"),
    "port": 5432,
    "database": os.getenv("DB_NAME", "your_database_name"),
    "user": os.getenv("DB_USER", "your_username")
}

# Prompt for password securely
password = getpass.getpass("Database password: ")

# Connect with config and password
conn = psycopg2.connect(
    host=db_config["host"],
    port=db_config["port"],
    database=db_config["database"],
    user=db_config["user"],
    password=password
)

Common Research Database Operations

Reading Research Data

import pandas as pd
import psycopg2
from dotenv import load_dotenv

load_dotenv()
conn = psycopg2.connect(
    host=os.getenv("DB_HOST"),
    database=os.getenv("DB_NAME"),
    user=os.getenv("DB_USER"),
    password=os.getenv("DB_PASSWORD"),
    port=os.getenv("DB_PORT", "5432")
)

# Read specific research tables
person_data = pd.read_sql_query("SELECT * FROM research.person LIMIT 1000", conn)

# Execute research-specific queries
condition_summary_query = """
    SELECT 
        c.concept_name,
        COUNT(DISTINCT co.person_id) as patient_count,
        ROUND(AVG(co.age_at_condition_start), 1) as avg_age
    FROM research.condition_occurrence co
    JOIN research.concept c ON co.condition_concept_id = c.concept_id
    JOIN research.person p ON co.person_id = p.person_id
    WHERE p.adrc_cohort = 1
    GROUP BY c.concept_name
    ORDER BY patient_count DESC
"""

condition_summary = pd.read_sql_query(condition_summary_query, conn)
print(condition_summary)

# Read in chunks for large research datasets
chunk_size = 5000
offset = 0

while True:
    chunk_query = f"""
        SELECT * 
        FROM research.measurement 
        WHERE age_at_measurement >= 65
        ORDER BY measurement_id
        LIMIT {chunk_size} OFFSET {offset}
    """
    chunk = pd.read_sql_query(chunk_query, conn)

    if chunk.empty:
        break

    # Process chunk
    print(f"Processed {len(chunk)} measurements")

    # Analyze chunk here
    # ...

    offset += chunk_size

conn.close()

Research Data Analysis Patterns

# Note: Research schema is read-only for analysis
# No INSERT/UPDATE/DELETE operations allowed

# Common pattern: Join with concept table for readable names
readable_data_query = """
    SELECT 
        de.person_id,
        de.age_at_drug_start,
        c.concept_name as drug_name,
        de.days_supply
    FROM research.drug_exposure de
    JOIN research.concept c ON de.drug_concept_id = c.concept_id
    WHERE de.age_at_drug_start >= 60
"""

readable_data = pd.read_sql_query(readable_data_query, conn)
print(readable_data)

Performance Tips for Research Data

Chunked Reading for Large Tables

def process_large_table(conn, table_name, chunk_size=10000):
    """
    Process large research tables in chunks to avoid memory issues.

    Args:
        conn: Database connection
        table_name: Name of the table to process
        chunk_size: Number of rows to process at a time
    """
    offset = 0

    while True:
        # Query with LIMIT and OFFSET for chunking
        query = f"""
            SELECT * 
            FROM research.{table_name} 
            WHERE adrc_cohort = 1
            ORDER BY person_id
            LIMIT {chunk_size} OFFSET {offset}
        """

        chunk = pd.read_sql_query(query, conn)

        if chunk.empty:
            break

        # Process your chunk here
        print(f"Processed {len(chunk)} rows")

        # Example: Analyze chunk
        # analyze_chunk(chunk)

        offset += chunk_size

# Usage
process_large_table(conn, "measurement", chunk_size=5000)

Optimized Research Queries

import time

# Use indexes efficiently - filter by person_id first
efficient_query = """
    SELECT co.*, c.concept_name
    FROM research.condition_occurrence co
    JOIN research.concept c ON co.condition_concept_id = c.concept_id
    WHERE co.person_id IN (
        SELECT person_id FROM research.person WHERE adrc_cohort = 1
    )
    AND co.age_at_condition_start >= 65
"""

# Measure query performance
start_time = time.time()
result = pd.read_sql_query(efficient_query, conn)
elapsed_time = time.time() - start_time
print(f"Query executed in {elapsed_time:.2f} seconds")
print(f"Retrieved {len(result)} rows")

# Count query for quick checks
count_query = "SELECT COUNT(*) FROM research.condition_occurrence"
start_time = time.time()
count_result = pd.read_sql_query(count_query, conn)
elapsed_time = time.time() - start_time
print(f"Count query executed in {elapsed_time:.2f} seconds")
print(f"Total rows: {count_result.iloc[0, 0]}")

Troubleshooting Common Issues

Package Installation Problems

# Check if PostgreSQL client libraries are installed
dpkg -l | grep libpq-dev

# Only install if missing (no output from above command)
sudo apt install libpq-dev build-essential

# If psycopg2 fails to install, try installing from source
pip install psycopg2-binary

Note: The libpq-dev package contains PostgreSQL client libraries that Python packages need to compile and connect to PostgreSQL databases. Many systems already have these installed. If your Python package installation succeeds without errors, you likely don't need to install additional system dependencies.

Connection Problems

import psycopg2
from dotenv import load_dotenv
import os

load_dotenv()

# Test connection with error handling
try:
    conn = psycopg2.connect(
        host=os.getenv("DB_HOST"),
        database=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        port=os.getenv("DB_PORT", "5432")
    )

    # Test basic query
    cursor = conn.cursor()
    cursor.execute("SELECT current_user, current_database()")
    result = cursor.fetchone()
    print("Connection successful!")
    print(f"User: {result[0]}, Database: {result[1]}")

    conn.close()

except psycopg2.Error as e:
    print(f"Connection failed: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Research Schema Access Issues

import pandas as pd

# Verify research schema exists and you have access
schema_check_query = """
    SELECT 
        schemaname,
        tablename
    FROM pg_tables 
    WHERE schemaname = 'research'
    ORDER BY tablename
"""

schema_check = pd.read_sql_query(schema_check_query, conn)

if len(schema_check) == 0:
    print("Research schema not found or no access")
else:
    print("Available research tables:")
    print(schema_check['tablename'].tolist())

Best Practices for Research Data

  1. Always close connections: Use conn.close() when finished or use context managers
  2. Filter by ADRC cohort: Use WHERE adrc_cohort = 1 for main research population
  3. Use schema prefix: Always specify research.table_name
  4. Join with concepts: Always join to research.concept for readable names
  5. Handle large datasets: Use chunked reading for measurements and observations
  6. Monitor performance: Use time.time() for query optimization
  7. Use context managers: Ensure connections are properly closed even if errors occur
import os
import psycopg2
import pandas as pd
from dotenv import load_dotenv
from contextlib import contextmanager

load_dotenv()

@contextmanager
def get_db_connection():
    """
    Context manager for database connections.
    Ensures connections are properly closed even if errors occur.
    """
    conn = None
    try:
        conn = psycopg2.connect(
            host=os.getenv("DB_HOST"),
            database=os.getenv("DB_NAME"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            port=os.getenv("DB_PORT", "5432")
        )
        yield conn
    except psycopg2.Error as e:
        print(f"Database error: {e}")
        raise
    finally:
        if conn:
            conn.close()

def research_analysis():
    """
    Example research query with best practices.
    """
    try:
        with get_db_connection() as conn:
            # Filter ADRC cohort and join with concepts
            query = """
                SELECT 
                    p.person_id,
                    p.gender_source_value,
                    co.age_at_condition_start,
                    c.concept_name as condition_name
                FROM research.person p
                JOIN research.condition_occurrence co ON p.person_id = co.person_id
                JOIN research.concept c ON co.condition_concept_id = c.concept_id
                WHERE p.adrc_cohort = 1
                    AND co.age_at_condition_start >= 65
                ORDER BY co.age_at_condition_start
            """

            result = pd.read_sql_query(query, conn)
            return result

    except Exception as e:
        print(f"Query failed: {e}")
        return None

# Usage
results = research_analysis()
if results is not None:
    print(f"Retrieved {len(results)} records")
    print(results.head())

Additional Resources

Python PostgreSQL Packages

OMOP and Research Data