1. Pandas简介与基础

Pandas是Python中最流行的数据分析库之一,它提供了强大的数据结构和数据分析工具,使数据操作变得简单高效。Pandas的核心数据结构是Series(一维)和DataFrame(二维),它们能够处理各种类型的数据,并提供了丰富的方法进行数据清洗、转换、分析和可视化。

在开始使用Pandas进行数据读取之前,首先需要确保已安装Pandas库:

# 安装Pandas pip install pandas # 导入Pandas import pandas as pd 

Pandas支持从多种数据源读取数据,包括文件(CSV、Excel、JSON等)、数据库、Web API等。掌握这些数据读取方法是数据分析的第一步,也是至关重要的一步。

2. 基本文件数据读取

2.1 CSV文件读取

CSV(Comma-Separated Values)是最常见的数据交换格式之一。Pandas提供了read_csv()函数来读取CSV文件。

# 基本CSV读取 df = pd.read_csv('data.csv') # 指定分隔符 df = pd.read_csv('data.txt', sep='t') # 制表符分隔的文件 # 指定编码 df = pd.read_csv('data.csv', encoding='utf-8') # 跳过行 df = pd.read_csv('data.csv', skiprows=1) # 跳过第一行 df = pd.read_csv('data.csv', skiprows=[1, 3, 5]) # 跳过指定行 # 指定列名 df = pd.read_csv('data.csv', names=['col1', 'col2', 'col3']) # 只读取特定列 df = pd.read_csv('data.csv', usecols=['col1', 'col2']) # 处理日期列 df = pd.read_csv('data.csv', parse_dates=['date_column']) # 设置索引列 df = pd.read_csv('data.csv', index_col='id_column') # 处理缺失值 df = pd.read_csv('data.csv', na_values=['NA', 'N/A', 'null', 'NaN']) # 限制读取行数 df = pd.read_csv('data.csv', nrows=1000) # 只读取前1000行 # 处理大文件 - 分块读取 chunk_iter = pd.read_csv('large_data.csv', chunksize=10000) for chunk in chunk_iter: process(chunk) # 处理每个数据块 

实际应用示例:

# 读取销售数据并处理日期 sales_df = pd.read_csv( 'sales_data.csv', parse_dates=['order_date'], dayfirst=True, # 日期格式为日/月/年 thousands=',', # 处理千位分隔符 dtype={ 'customer_id': 'str', # 指定列数据类型 'amount': 'float' } ) # 显示数据基本信息 print(sales_df.info()) print(sales_df.head()) # 转换日期列为月份 sales_df['order_month'] = sales_df['order_date'].dt.to_period('M') # 按月份分组计算销售额 monthly_sales = sales_df.groupby('order_month')['amount'].sum() print(monthly_sales) 

2.2 Excel文件读取

Excel文件是商业环境中常用的数据存储格式。Pandas提供了read_excel()函数来读取Excel文件。

# 基本Excel读取 df = pd.read_excel('data.xlsx') # 指定工作表 df = pd.read_excel('data.xlsx', sheet_name='Sheet1') # 通过名称 df = pd.read_excel('data.xlsx', sheet_name=0) # 通过索引 # 读取多个工作表 all_sheets = pd.read_excel('data.xlsx', sheet_name=None) for sheet_name, df in all_sheets.items(): print(f"Sheet: {sheet_name}") print(df.head()) # 指定行和列 df = pd.read_excel('data.xlsx', header=1) # 使用第二行作为列名 df = pd.read_excel('data.xlsx', usecols='A:C') # 读取A到C列 # 处理日期 df = pd.read_excel('data.xlsx', parse_dates=['date_column']) # 跳过行和列 df = pd.read_excel('data.xlsx', skiprows=2, skipfooter=1) # 跳过前2行和最后1行 

实际应用示例:

# 读取财务报表数据 financial_df = pd.read_excel( 'financial_report.xlsx', sheet_name='Q4_2023', header=1, # 使用第二行作为列名 usecols='A:F', # 只读取A到F列 na_values=['-', 'NA', 'N/A'], converters={ 'Account': str, # 账户代码保持为字符串 'Amount': float # 金额转换为浮点数 } ) # 计算总金额 total_amount = financial_df['Amount'].sum() print(f"Total Amount: {total_amount:,.2f}") # 按类别分组统计 category_stats = financial_df.groupby('Category')['Amount'].agg(['sum', 'count', 'mean']) print(category_stats) 

2.3 文本文件读取

除了CSV,Pandas还可以读取其他格式的文本文件,如固定宽度格式的文件。

# 读取固定宽度格式的文件 df = pd.read_fwf( 'fixed_width_data.txt', widths=[10, 15, 10, 10], # 每列的宽度 names=['col1', 'col2', 'col3', 'col4'], dtype={'col1': str, 'col2': str} ) # 读取表格化的文本文件 df = pd.read_table( 'tabular_data.txt', delimiter='|', # 自定义分隔符 skipinitialspace=True # 跳过分隔符后的空格 ) 

实际应用示例:

# 读取日志文件 log_df = pd.read_fwf( 'server.log', colspecs=[(0, 10), (11, 25), (26, 40), (41, None)], # 指定每列的起始和结束位置 names=['timestamp', 'ip_address', 'user_id', 'message'], converters={ 'timestamp': lambda x: pd.to_datetime(x, format='%Y-%m-%d'), 'ip_address': str } ) # 提取错误日志 error_logs = log_df[log_df['message'].str.contains('ERROR', case=False)] print(error_logs.head()) # 按IP统计错误次数 error_by_ip = error_logs['ip_address'].value_counts().head(10) print("Top 10 IPs with errors:") print(error_by_ip) 

3. 数据库数据读取

3.1 SQL数据库读取

Pandas可以与多种SQL数据库交互,如SQLite、MySQL、PostgreSQL等。

# SQLite数据库读取 import sqlite3 # 创建数据库连接 conn = sqlite3.connect('database.db') # 读取整个表到DataFrame df = pd.read_sql('SELECT * FROM table_name', conn) # 使用参数化查询 df = pd.read_sql( 'SELECT * FROM table_name WHERE column_name = ?', conn, params=('value',) ) # 执行复杂查询 query = """ SELECT t1.*, t2.column_name FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id WHERE t1.date BETWEEN ? AND ? ORDER BY t1.date DESC """ df = pd.read_sql(query, conn, params=('2023-01-01', '2023-12-31')) # 关闭连接 conn.close() 

使用SQLAlchemy进行更高级的数据库操作:

from sqlalchemy import create_engine # 创建数据库引擎 # SQLite engine = create_engine('sqlite:///database.db') # MySQL # engine = create_engine('mysql+pymysql://username:password@host:port/database') # PostgreSQL # engine = create_engine('postgresql://username:password@host:port/database') # 使用引擎读取数据 df = pd.read_sql_table('table_name', engine) # 使用SQL查询 df = pd.read_sql_query('SELECT * FROM table_name', engine) # 使用上下文管理器确保连接关闭 with engine.connect() as conn: df = pd.read_sql('SELECT * FROM table_name', conn) 

实际应用示例:

# 从SQLite数据库分析销售数据 from sqlalchemy import create_engine # 创建数据库引擎 engine = create_engine('sqlite:///sales.db') # 查询销售数据并计算每个产品的月度销售趋势 query = """ SELECT p.product_name, strftime('%Y-%m', o.order_date) as month, SUM(oi.quantity * oi.unit_price) as revenue, SUM(oi.quantity) as units_sold FROM orders o JOIN order_items oi ON o.order_id = oi.order_id JOIN products p ON oi.product_id = p.product_id WHERE o.order_date BETWEEN '2023-01-01' AND '2023-12-31' GROUP BY p.product_name, strftime('%Y-%m', o.order_date) ORDER BY p.product_name, month """ # 读取数据 sales_trend = pd.read_sql(query, engine) # 转换为透视表以便分析 sales_pivot = sales_trend.pivot( index='product_name', columns='month', values='revenue' ) # 计算每个产品的年度总销售额 sales_pivot['total_annual'] = sales_pivot.sum(axis=1) # 找出最畅销的10个产品 top_products = sales_pivot.sort_values('total_annual', ascending=False).head(10) print("Top 10 Products by Annual Revenue:") print(top_products) # 可视化趋势 top_products.drop('total_annual', axis=1).T.plot(kind='line', figsize=(12, 6)) 

3.2 NoSQL数据库读取

Pandas也可以与NoSQL数据库(如MongoDB)进行交互。

# MongoDB读取 from pymongo import MongoClient # 连接到MongoDB client = MongoClient('mongodb://localhost:27017/') db = client['database_name'] collection = db['collection_name'] # 查询数据并转换为DataFrame cursor = collection.find({'field': 'value'}) df = pd.DataFrame(list(cursor)) # 关闭连接 client.close() 

实际应用示例:

# 从MongoDB读取用户行为数据 from pymongo import MongoClient # 连接到MongoDB client = MongoClient('mongodb://localhost:27017/') db = client['user_analytics'] collection = db['user_events'] # 查询特定时间段内的用户行为 start_date = datetime(2023, 1, 1) end_date = datetime(2023, 12, 31) query = { 'event_timestamp': { '$gte': start_date, '$lte': end_date } } # 只选择需要的字段 projection = { 'user_id': 1, 'event_type': 1, 'event_timestamp': 1, 'page_url': 1 } # 执行查询 cursor = collection.find(query, projection).sort('event_timestamp', 1) events_df = pd.DataFrame(list(cursor)) # 关闭连接 client.close() # 转换时间戳 events_df['event_timestamp'] = pd.to_datetime(events_df['event_timestamp']) # 提取日期和小时 events_df['date'] = events_df['event_timestamp'].dt.date events_df['hour'] = events_df['event_timestamp'].dt.hour # 分析每日活跃用户 daily_active_users = events_df.groupby('date')['user_id'].nunique() print("Daily Active Users:") print(daily_active_users.head()) # 分析每小时事件类型分布 hourly_events = events_df.groupby(['hour', 'event_type']).size().unstack() print("Hourly Event Distribution:") print(hourly_events.head()) 

4. 网络数据读取

4.1 从URL读取数据

Pandas可以直接从URL读取数据,无需先下载到本地。

# 从URL读取CSV url = 'https://example.com/data.csv' df = pd.read_csv(url) # 从URL读取Excel url = 'https://example.com/data.xlsx' df = pd.read_excel(url) # 处理需要认证的URL import requests from io import StringIO url = 'https://example.com/protected_data.csv' headers = {'Authorization': 'Bearer your_token_here'} response = requests.get(url, headers=headers) if response.status_code == 200: df = pd.read_csv(StringIO(response.text)) else: print(f"Failed to fetch data: {response.status_code}") 

实际应用示例:

# 从公开数据集分析COVID-19数据 import pandas as pd import matplotlib.pyplot as plt # 从Johns Hopkins GitHub仓库读取COVID-19数据 url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv' # 读取数据 covid_df = pd.read_csv(url) # 查看数据结构 print(covid_df.head()) print(covid_df.info()) # 数据重塑:将日期列转换为行 id_cols = ['Province/State', 'Country/Region', 'Lat', 'Long'] date_cols = [col for col in covid_df.columns if col not in id_cols] # 使用melt进行数据重塑 covid_long = pd.melt( covid_df, id_vars=id_cols, value_vars=date_cols, var_name='Date', value_name='Confirmed' ) # 转换日期列 covid_long['Date'] = pd.to_datetime(covid_long['Date']) # 按国家和日期汇总 country_data = covid_long.groupby(['Country/Region', 'Date'])['Confirmed'].sum().reset_index() # 选择几个国家进行比较 countries = ['US', 'Italy', 'China', 'Brazil', 'India'] selected_countries = country_data[country_data['Country/Region'].isin(countries)] # 创建透视表以便可视化 pivot_data = selected_countries.pivot( index='Date', columns='Country/Region', values='Confirmed' ) # 绘制趋势图 plt.figure(figsize=(12, 8)) for country in countries: plt.plot(pivot_data.index, pivot_data[country], label=country) plt.title('COVID-19 Confirmed Cases Over Time') plt.xlabel('Date') plt.ylabel('Confirmed Cases') plt.legend() plt.grid(True) plt.show() 

4.2 从API读取数据

许多Web服务提供API接口,Pandas可以结合requests库从API获取数据。

import requests import pandas as pd # 基本API请求 url = 'https://api.example.com/data' response = requests.get(url) data = response.json() df = pd.DataFrame(data) # 带参数的API请求 params = { 'start_date': '2023-01-01', 'end_date': '2023-12-31', 'limit': 1000 } response = requests.get(url, params=params) data = response.json() df = pd.DataFrame(data['results']) # 假设数据在results字段中 # 处理分页API all_data = [] page = 1 while True: params['page'] = page response = requests.get(url, params=params) data = response.json() if not data['results']: # 如果没有更多数据 break all_data.extend(data['results']) page += 1 df = pd.DataFrame(all_data) 

实际应用示例:

# 从OpenWeatherMap API获取天气数据并分析 import requests import pandas as pd import time # API密钥(实际使用时应该从环境变量或配置文件中获取) API_KEY = 'your_api_key_here' BASE_URL = 'https://api.openweathermap.org/data/2.5' # 获取多个城市的天气数据 cities = ['Beijing', 'Shanghai', 'New York', 'London', 'Tokyo'] weather_data = [] for city in cities: # 构建API请求URL url = f"{BASE_URL}/weather" params = { 'q': city, 'appid': API_KEY, 'units': 'metric' # 使用摄氏度 } # 发送请求 response = requests.get(url, params=params) if response.status_code == 200: data = response.json() # 提取所需信息 city_info = { 'city': city, 'country': data['sys']['country'], 'temperature': data['main']['temp'], 'feels_like': data['main']['feels_like'], 'humidity': data['main']['humidity'], 'pressure': data['main']['pressure'], 'description': data['weather'][0]['description'], 'wind_speed': data['wind']['speed'], 'timestamp': pd.to_datetime(data['dt'], unit='s') } weather_data.append(city_info) else: print(f"Failed to get weather data for {city}: {response.status_code}") # 避免API速率限制 time.sleep(1) # 转换为DataFrame weather_df = pd.DataFrame(weather_data) # 分析数据 print("Weather Data Summary:") print(weather_df.describe()) # 找出最热和最冷的城市 hottest = weather_df.loc[weather_df['temperature'].idxmax()] coldest = weather_df.loc[weather_df['temperature'].idxmin()] print(f"nHottest city: {hottest['city']}, {hottest['country']} - {hottest['temperature']}°C") print(f"Coldest city: {coldest['city']}, {coldest['country']} - {coldest['temperature']}°C") # 按温度排序 sorted_by_temp = weather_df.sort_values('temperature', ascending=False) print("nCities sorted by temperature:") print(sorted_by_temp[['city', 'country', 'temperature']]) 

5. 特殊格式数据读取

5.1 JSON数据读取

JSON(JavaScript Object Notation)是Web API常用的数据交换格式。

# 基本JSON读取 df = pd.read_json('data.json') # 处理嵌套JSON import json with open('nested_data.json', 'r') as f: data = json.load(f) # 使用json_normalize处理嵌套结构 from pandas import json_normalize df = json_normalize(data, 'items', ['meta_field1', 'meta_field2']) # 从字符串读取JSON json_string = '{"name": "John", "age": 30, "city": "New York"}' df = pd.read_json(json_string) # 处理JSON行格式(每行一个JSON对象) df = pd.read_json('data.jsonl', lines=True) 

实际应用示例:

# 分析Twitter API返回的JSON数据 import pandas as pd from pandas import json_normalize import json # 假设我们从Twitter API获取了推文数据 # 这里我们使用一个示例JSON文件 with open('tweets.json', 'r') as f: tweets_data = json.load(f) # 展平嵌套的JSON结构 tweets_df = json_normalize(tweets_data) # 查看数据结构 print("Columns in tweets data:") print(tweets_df.columns.tolist()) # 选择有用的列 useful_columns = [ 'id', 'created_at', 'text', 'source', 'user.id', 'user.name', 'user.screen_name', 'user.followers_count', 'retweet_count', 'favorite_count' ] # 确保只选择存在的列 existing_columns = [col for col in useful_columns if col in tweets_df.columns] tweets_clean = tweets_df[existing_columns] # 转换日期列 tweets_clean['created_at'] = pd.to_datetime(tweets_clean['created_at']) # 提取日期和小时 tweets_clean['date'] = tweets_clean['created_at'].dt.date tweets_clean['hour'] = tweets_clean['created_at'].dt.hour # 分析每日推文数量 daily_tweets = tweets_clean.groupby('date').size() print("Daily tweet counts:") print(daily_tweets.head()) # 分析每小时推文分布 hourly_tweets = tweets_clean.groupby('hour').size() print("Hourly tweet distribution:") print(hourly_tweets) # 找出最受欢迎的推文(按转发和点赞数) tweets_clean['engagement'] = tweets_clean['retweet_count'] + tweets_clean['favorite_count'] top_tweets = tweets_clean.nlargest(5, 'engagement') print("nTop 5 tweets by engagement:") for idx, tweet in top_tweets.iterrows(): print(f"Tweet ID: {tweet['id']}") print(f"User: {tweet['user.name']} (@{tweet['user.screen_name']})") print(f"Text: {tweet['text'][:100]}...") print(f"Engagement: {tweet['engagement']} (Retweets: {tweet['retweet_count']}, Likes: {tweet['favorite_count']})") print("-" * 50) 

5.2 HTML数据读取

Pandas可以从HTML页面中提取表格数据。

# 从URL读取HTML表格 url = 'https://example.com/page_with_tables.html' tables = pd.read_html(url) # tables是一个DataFrame列表,每个表格一个DataFrame df = tables[0] # 获取第一个表格 # 从本地HTML文件读取 tables = pd.read_html('local_page.html') df = tables[0] # 指定表格索引或属性 tables = pd.read_html(url, attrs={'id': 'specific_table'}) df = tables[0] # 处理多表格情况 for i, table in enumerate(tables): print(f"Table {i}: {table.shape}") 

实际应用示例:

# 从维基百科页面提取历史数据并分析 import pandas as pd import matplotlib.pyplot as plt # 从维基百科获取世界人口历史数据 url = 'https://en.wikipedia.org/wiki/World_population' tables = pd.read_html(url) # 找到包含历史人口数据的表格 # 通常需要检查多个表格以找到正确的表格 population_table = None for table in tables: if 'Year' in table.columns and 'World population' in table.columns: population_table = table break if population_table is not None: # 清理数据 # 重命名列 population_table.columns = ['Year', 'World_population', 'Annual_change'] # 移除包含非数字的行 population_table = population_table[population_table['Year'].apply(lambda x: str(x).isdigit())] # 转换数据类型 population_table['Year'] = population_table['Year'].astype(int) population_table['World_population'] = population_table['World_population'].str.replace(',', '').astype(float) # 计算每10年的人口增长率 population_table['Decade'] = (population_table['Year'] // 10) * 10 decade_population = population_table.groupby('Decade')['World_population'].last() # 计算十年增长率 decade_growth = decade_population.pct_change() * 100 # 可视化 plt.figure(figsize=(12, 6)) plt.subplot(1, 2, 1) plt.plot(population_table['Year'], population_table['World_population'] / 1e9) plt.title('World Population Over Time') plt.xlabel('Year') plt.ylabel('Population (billions)') plt.grid(True) plt.subplot(1, 2, 2) decade_growth.plot(kind='bar') plt.title('Decadal Population Growth Rate') plt.xlabel('Decade') plt.ylabel('Growth Rate (%)') plt.grid(True, axis='y') plt.tight_layout() plt.show() # 分析人口翻倍时间 # 找到人口翻倍的时期 for i in range(1, len(decade_population)): if decade_population.iloc[i] >= 2 * decade_population.iloc[i-1]: start_decade = decade_population.index[i-1] end_decade = decade_population.index[i] years = end_decade - start_decade print(f"Population doubled between {start_decade}s and {end_decade}s, taking {years} years") else: print("Could not find population data table") 

5.3 其他格式数据读取

Pandas还支持多种其他格式的数据读取,如HDF5、Parquet、Feather等。

# HDF5格式读取 df = pd.read_hdf('data.h5', 'table_name') # Parquet格式读取 df = pd.read_parquet('data.parquet') # Feather格式读取 df = pd.read_feather('data.feather') # Stata格式读取 df = pd.read_stata('data.dta') # SAS格式读取 df = pd.read_sas('data.sas7bdat') # Pickle格式读取 df = pd.read_pickle('data.pkl') 

实际应用示例:

# 比较不同文件格式的读写性能 import pandas as pd import numpy as np import time import os # 创建一个较大的测试DataFrame rows = 100000 cols = 20 data = np.random.rand(rows, cols) df = pd.DataFrame(data, columns=[f'col_{i}' for i in range(cols)]) # 测试不同格式的读写性能 formats = { 'CSV': {'write': lambda df: df.to_csv('test.csv', index=False), 'read': lambda: pd.read_csv('test.csv')}, 'Parquet': {'write': lambda df: df.to_parquet('test.parquet'), 'read': lambda: pd.read_parquet('test.parquet')}, 'Feather': {'write': lambda df: df.to_feather('test.feather'), 'read': lambda: pd.read_feather('test.feather')}, 'HDF5': {'write': lambda df: df.to_hdf('test.h5', 'data', mode='w'), 'read': lambda: pd.read_hdf('test.h5', 'data')}, 'Pickle': {'write': lambda df: df.to_pickle('test.pkl'), 'read': lambda: pd.read_pickle('test.pkl')} } # 测试每种格式 results = [] for fmt_name, ops in formats.items(): # 测试写入时间 start_time = time.time() ops['write'](df) write_time = time.time() - start_time # 获取文件大小 file_size = os.path.getsize(f'test.{fmt_name.lower()}') / (1024 * 1024) # MB # 测试读取时间 start_time = time.time() df_read = ops['read']() read_time = time.time() - start_time # 验证数据是否正确读取 assert df.equals(df_read), f"Data integrity check failed for {fmt_name}" # 记录结果 results.append({ 'Format': fmt_name, 'Write Time (s)': write_time, 'Read Time (s)': read_time, 'Total Time (s)': write_time + read_time, 'File Size (MB)': file_size }) # 清理测试文件 os.remove(f'test.{fmt_name.lower()}') # 显示结果 results_df = pd.DataFrame(results) print(results_df) # 可视化比较 import matplotlib.pyplot as plt plt.figure(figsize=(12, 8)) plt.subplot(2, 1, 1) plt.bar(results_df['Format'], results_df['Write Time (s)'], label='Write Time') plt.bar(results_df['Format'], results_df['Read Time (s)'], bottom=results_df['Write Time (s)'], label='Read Time') plt.title('Read/Write Time by Format') plt.ylabel('Time (s)') plt.legend() plt.subplot(2, 1, 2) plt.bar(results_df['Format'], results_df['File Size (MB)']) plt.title('File Size by Format') plt.ylabel('Size (MB)') plt.tight_layout() plt.show() 

6. 大数据处理技巧

6.1 分块读取大数据

当处理大型数据集时,内存可能成为限制因素。Pandas提供了分块处理的功能。

# 分块读取CSV文件 chunk_size = 10000 # 每块的行数 chunks = pd.read_csv('large_data.csv', chunksize=chunk_size) # 处理每个数据块 results = [] for chunk in chunks: # 对每个数据块进行处理 processed_chunk = process_function(chunk) results.append(processed_chunk) # 合并结果 final_result = pd.concat(results) # 使用聚合函数处理分块数据 # 例如,计算总和 total_sum = 0 for chunk in pd.read_csv('large_data.csv', chunksize=chunk_size): total_sum += chunk['value_column'].sum() print(f"Total sum: {total_sum}") 

实际应用示例:

# 处理大型销售数据集,计算月度销售额 import pandas as pd # 假设我们有一个非常大的销售数据文件 # 我们想计算每个产品类别的月度销售额 # 初始化一个空的DataFrame来存储聚合结果 monthly_sales = pd.DataFrame() # 分块读取数据 chunk_size = 50000 chunks = pd.read_csv('huge_sales_data.csv', chunksize=chunk_size) for i, chunk in enumerate(chunks): print(f"Processing chunk {i+1}...") # 转换日期列为datetime chunk['order_date'] = pd.to_datetime(chunk['order_date']) # 提取年月 chunk['year_month'] = chunk['order_date'].dt.to_period('M') # 按产品类别和月份分组计算销售额 chunk_sales = chunk.groupby(['category', 'year_month'])['amount'].sum().reset_index() # 累积聚合结果 if monthly_sales.empty: monthly_sales = chunk_sales else: # 合并并累加销售额 monthly_sales = pd.concat([monthly_sales, chunk_sales]) monthly_sales = monthly_sales.groupby(['category', 'year_month'])['amount'].sum().reset_index() # 最终结果 print("Monthly sales by category:") print(monthly_sales.head(10)) # 创建透视表以便更好地分析 sales_pivot = monthly_sales.pivot( index='category', columns='year_month', values='amount' ) # 找出销售额最高的类别 sales_pivot['total'] = sales_pivot.sum(axis=1) top_categories = sales_pivot.sort_values('total', ascending=False).head(5) print("nTop 5 categories by total sales:") print(top_categories[['total']]) # 可视化前5类产品的月度销售趋势 top_categories.drop('total', axis=1).T.plot(figsize=(12, 6)) plt.title('Monthly Sales Trend for Top 5 Categories') plt.ylabel('Sales Amount') plt.xlabel('Month') plt.grid(True) plt.show() 

6.2 使用Dask处理超大数据

对于超出内存容量的数据集,可以考虑使用Dask,它提供了类似Pandas的API但可以处理大于内存的数据集。

# 安装Dask # pip install dask import dask.dataframe as dd # 创建Dask DataFrame ddf = dd.read_csv('very_large_data.csv') # 执行类似Pandas的操作 result = ddf.groupby('category').value.mean() # 计算结果(触发实际计算) computed_result = result.compute() # 转换为Pandas DataFrame df_result = computed_result.to_frame() 

实际应用示例:

# 使用Dask分析大型网站日志数据 import dask.dataframe as dd import pandas as pd import matplotlib.pyplot as plt # 创建Dask DataFrame读取大型日志文件 # 假设日志文件格式为:timestamp,ip_address,userid,page_url,response_time,status_code ddf = dd.read_csv( 'web_server_logs/*.log', names=['timestamp', 'ip_address', 'userid', 'page_url', 'response_time', 'status_code'], parse_dates=['timestamp'], dtype={ 'ip_address': 'object', 'userid': 'object', 'page_url': 'object', 'response_time': 'float64', 'status_code': 'int64' } ) # 提取小时和日期 ddf['hour'] = ddf['timestamp'].dt.hour ddf['date'] = ddf['timestamp'].dt.date # 计算每小时的平均响应时间 hourly_response = ddf.groupby('hour')['response_time'].mean().compute() # 计算每日的请求量 daily_requests = ddf.groupby('date').size().compute() # 计算最常访问的页面 top_pages = ddf['page_url'].value_counts().compute().head(10) # 计算状态码分布 status_distribution = ddf['status_code'].value_counts().compute() # 可视化结果 plt.figure(figsize=(15, 10)) plt.subplot(2, 2, 1) hourly_response.plot(kind='bar') plt.title('Average Response Time by Hour') plt.xlabel('Hour of Day') plt.ylabel('Average Response Time (ms)') plt.grid(True, axis='y') plt.subplot(2, 2, 2) daily_requests.plot() plt.title('Daily Request Volume') plt.xlabel('Date') plt.ylabel('Number of Requests') plt.grid(True) plt.subplot(2, 2, 3) top_pages.plot(kind='barh') plt.title('Top 10 Most Accessed Pages') plt.xlabel('Number of Requests') plt.grid(True, axis='x') plt.subplot(2, 2, 4) status_distribution.plot(kind='pie', autopct='%1.1f%%') plt.title('HTTP Status Code Distribution') plt.tight_layout() plt.show() # 分析高流量时段 peak_hours = hourly_response.nlargest(3) print("nPeak hours with highest average response time:") print(peak_hours) # 分析错误率 error_rate = ddf[ddf['status_code'] >= 400].groupby('date').size() / ddf.groupby('date').size() error_rate = error_rate.compute() * 100 # 转换为百分比 high_error_days = error_rate.nlargest(5) print("nDays with highest error rates:") print(high_error_days) 

6.3 数据类型优化

优化数据类型可以显著减少内存使用,提高处理速度。

# 检查DataFrame的内存使用 df.info(memory_usage='deep') # 优化数值类型 df['integer_column'] = pd.to_numeric(df['integer_column'], downcast='integer') df['float_column'] = pd.to_numeric(df['float_column'], downcast='float') # 优化对象类型(字符串) df['string_column'] = df['string_column'].astype('category') # 适用于低基数字符串列 # 优化日期时间类型 df['date_column'] = pd.to_datetime(df['date_column']) # 创建一个函数来优化整个DataFrame def optimize_dataframe(df): # 转换数值类型 for col in df.select_dtypes(include=['int64']).columns: df[col] = pd.to_numeric(df[col], downcast='integer') for col in df.select_dtypes(include=['float64']).columns: df[col] = pd.to_numeric(df[col], downcast='float') # 转换对象类型 for col in df.select_dtypes(include=['object']).columns: num_unique = df[col].nunique() if num_unique / len(df[col]) < 0.5: # 如果唯一值比例小于50% df[col] = df[col].astype('category') return df # 应用优化 optimized_df = optimize_dataframe(df.copy()) print("Memory usage before optimization:") print(df.memory_usage(deep=True).sum() / (1024 * 1024), "MB") print("Memory usage after optimization:") print(optimized_df.memory_usage(deep=True).sum() / (1024 * 1024), "MB") 

实际应用示例:

# 优化大型客户数据集的内存使用 import pandas as pd import numpy as np # 创建一个模拟的大型客户数据集 np.random.seed(42) num_customers = 1000000 customer_data = { 'customer_id': range(1, num_customers + 1), 'name': ['Customer ' + str(i) for i in range(1, num_customers + 1)], 'age': np.random.randint(18, 80, size=num_customers), 'gender': np.random.choice(['Male', 'Female', 'Other'], size=num_customers), 'income': np.random.normal(50000, 15000, size=num_customers), 'registration_date': pd.date_range('2010-01-01', periods=num_customers, freq='D'), 'last_purchase_date': pd.date_range('2020-01-01', periods=num_customers, freq='D'), 'total_purchases': np.random.randint(0, 100, size=num_customers), 'avg_purchase_amount': np.random.uniform(10, 500, size=num_customers), 'preferred_category': np.random.choice(['Electronics', 'Clothing', 'Home', 'Books', 'Sports'], size=num_customers), 'loyalty_tier': np.random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'], size=num_customers), 'is_active': np.random.choice([True, False], size=num_customers, p=[0.8, 0.2]) } df = pd.DataFrame(customer_data) # 检查原始内存使用 print("Original DataFrame memory usage:") print(df.memory_usage(deep=True)) print(f"Total memory usage: {df.memory_usage(deep=True).sum() / (1024 * 1024):.2f} MB") # 优化DataFrame def optimize_customer_dataframe(df): # 创建副本以避免修改原始DataFrame df_opt = df.copy() # 优化整数列 df_opt['customer_id'] = pd.to_numeric(df_opt['customer_id'], downcast='integer') df_opt['age'] = pd.to_numeric(df_opt['age'], downcast='integer') df_opt['total_purchases'] = pd.to_numeric(df_opt['total_purchases'], downcast='integer') # 优化浮点数列 df_opt['income'] = pd.to_numeric(df_opt['income'], downcast='float') df_opt['avg_purchase_amount'] = pd.to_numeric(df_opt['avg_purchase_amount'], downcast='float') # 优化日期列 df_opt['registration_date'] = pd.to_datetime(df_opt['registration_date']) df_opt['last_purchase_date'] = pd.to_datetime(df_opt['last_purchase_date']) # 优化低基数字符串列为category类型 df_opt['gender'] = df_opt['gender'].astype('category') df_opt['preferred_category'] = df_opt['preferred_category'].astype('category') df_opt['loyalty_tier'] = df_opt['loyalty_tier'].astype('category') # 优化布尔列 df_opt['is_active'] = df_opt['is_active'].astype('bool') return df_opt # 应用优化 optimized_df = optimize_customer_dataframe(df) # 检查优化后的内存使用 print("nOptimized DataFrame memory usage:") print(optimized_df.memory_usage(deep=True)) print(f"Total memory usage: {optimized_df.memory_usage(deep=True).sum() / (1024 * 1024):.2f} MB") # 计算内存节省百分比 original_memory = df.memory_usage(deep=True).sum() optimized_memory = optimized_df.memory_usage(deep=True).sum() memory_saved_percent = (1 - optimized_memory / original_memory) * 100 print(f"nMemory saved: {memory_saved_percent:.2f}%") # 验证数据完整性 assert df.equals(optimized_df), "Data integrity check failed after optimization" print("nData integrity check passed: Original and optimized DataFrames are identical.") 

7. 实际案例与解决方案

7.1 数据清洗与预处理

实际数据往往包含缺失值、异常值和格式不一致的问题。以下是一个综合的数据清洗案例。

import pandas as pd import numpy as np # 读取包含各种问题的数据集 df = pd.read_csv('messy_data.csv') # 1. 处理缺失值 # 检查缺失值 print("Missing values per column:") print(df.isnull().sum()) # 根据列的特性处理缺失值 # 数值列:用中位数填充 numeric_cols = df.select_dtypes(include=['number']).columns for col in numeric_cols: df[col].fillna(df[col].median(), inplace=True) # 分类列:用众数填充 categorical_cols = df.select_dtypes(include=['object']).columns for col in categorical_cols: df[col].fillna(df[col].mode()[0], inplace=True) # 2. 处理异常值 # 使用IQR方法检测异常值 def detect_outliers(df, col): Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return (df[col] < lower_bound) | (df[col] > upper_bound) # 对数值列处理异常值 for col in numeric_cols: outliers = detect_outliers(df, col) # 用边界值替换异常值 Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR df.loc[df[col] < lower_bound, col] = lower_bound df.loc[df[col] > upper_bound, col] = upper_bound # 3. 标准化文本数据 # 去除前后空格 for col in categorical_cols: df[col] = df[col].str.strip() # 统一大小写 df['category_column'] = df['category_column'].str.lower() # 4. 转换数据类型 df['date_column'] = pd.to_datetime(df['date_column'], errors='coerce') df['numeric_string_column'] = pd.to_numeric(df['numeric_string_column'], errors='coerce') # 5. 删除重复数据 df.drop_duplicates(inplace=True) # 6. 创建新特征 # 从日期提取年、月、日 df['year'] = df['date_column'].dt.year df['month'] = df['date_column'].dt.month df['day'] = df['date_column'].dt.day # 分类别列 df['category_encoded'] = df['category_column'].astype('category').cat.codes # 7. 处理不平衡数据(如果是分类问题) from imblearn.over_sampling import SMOTE # 假设我们有一个不平衡的目标变量 X = df.drop('target_column', axis=1) y = df['target_column'] # 应用SMOTE过采样少数类 smote = SMOTE(random_state=42) X_resampled, y_resampled = smote.fit_resample(X, y) # 创建处理后的DataFrame df_processed = pd.concat([pd.DataFrame(X_resampled), pd.DataFrame(y_resampled, columns=['target_column'])], axis=1) print("Data preprocessing completed.") print("Final dataset shape:", df_processed.shape) 

7.2 多源数据整合

实际分析中,经常需要整合来自不同数据源的数据。

import pandas as pd import sqlite3 import requests from io import StringIO # 1. 从CSV读取销售数据 sales_df = pd.read_csv('sales_data.csv') # 2. 从数据库读取客户信息 conn = sqlite3.connect('customer_database.db') query = "SELECT * FROM customers" customers_df = pd.read_sql(query, conn) conn.close() # 3. 从API获取产品信息 url = "https://api.example.com/products" response = requests.get(url) products_json = response.json() products_df = pd.DataFrame(products_json) # 4. 数据清洗和预处理 # 销售数据 sales_df['order_date'] = pd.to_datetime(sales_df['order_date']) sales_df['month'] = sales_df['order_date'].dt.to_period('M') # 客户数据 customers_df['join_date'] = pd.to_datetime(customers_df['join_date']) customers_df['customer_segment'] = pd.cut( customers_df['annual_income'], bins=[0, 30000, 60000, 100000, float('inf')], labels=['Low', 'Medium', 'High', 'Premium'] ) # 产品数据 products_df['price'] = pd.to_numeric(products_df['price'], errors='coerce') products_df['category'] = products_df['category'].str.lower() # 5. 数据整合 # 合并销售和客户数据 sales_customer_df = pd.merge( sales_df, customers_df, left_on='customer_id', right_on='id', how='left' ) # 合并产品数据 full_df = pd.merge( sales_customer_df, products_df, left_on='product_id', right_on='id', how='left', suffixes=('_customer', '_product') ) # 6. 处理合并后的数据 # 重命名列 full_df = full_df.rename(columns={ 'id_customer': 'customer_id', 'id_product': 'product_id', 'name': 'product_name' }) # 删除不需要的列 full_df = full_df.drop(columns=['name_customer']) # 假设这是重复的客户名称列 # 处理缺失值 full_df['product_name'].fillna('Unknown Product', inplace=True) full_df['category'].fillna('Other', inplace=True) # 7. 创建分析特征 # 计算每个订单的总金额 full_df['total_amount'] = full_df['quantity'] * full_df['price'] # 按客户和月份分组计算销售统计 customer_monthly_stats = full_df.groupby(['customer_id', 'month']).agg({ 'total_amount': ['sum', 'mean', 'count'], 'quantity': 'sum' }).reset_index() # 扁平化多级列索引 customer_monthly_stats.columns = ['_'.join(col).strip() if col[1] else col[0] for col in customer_monthly_stats.columns] # 8. 保存整合后的数据 full_df.to_csv('integrated_sales_data.csv', index=False) customer_monthly_stats.to_csv('customer_monthly_stats.csv', index=False) print("Data integration completed.") print("Integrated dataset shape:", full_df.shape) print("Customer monthly stats shape:", customer_monthly_stats.shape) 

7.3 时间序列数据分析

时间序列数据是数据分析中常见的数据类型,特别在金融、销售和传感器数据等领域。

import pandas as pd import matplotlib.pyplot as plt from statsmodels.tsa.seasonal import seasonal_decompose from statsmodels.tsa.holtwinters import ExponentialSmoothing # 1. 读取时间序列数据 df = pd.read_csv('time_series_data.csv') # 2. 转换日期列并设置为索引 df['date'] = pd.to_datetime(df['date']) df.set_index('date', inplace=True) # 3. 检查并处理缺失值 print("Missing values:", df.isnull().sum()) # 前向填充缺失值 df.fillna(method='ffill', inplace=True) # 4. 重采样为不同频率 # 日数据重采样为周数据 weekly_data = df.resample('W').mean() # 日数据重采样为月数据 monthly_data = df.resample('M').sum() # 5. 时间序列分解 # 分解为趋势、季节性和残差成分 decomposition = seasonal_decompose(df['value'], model='additive', period=12) # 绘制分解结果 plt.figure(figsize=(12, 8)) plt.subplot(411) plt.plot(df['value'], label='Original') plt.legend() plt.subplot(412) plt.plot(decomposition.trend, label='Trend') plt.legend() plt.subplot(413) plt.plot(decomposition.seasonal, label='Seasonality') plt.legend() plt.subplot(414) plt.plot(decomposition.resid, label='Residuals') plt.legend() plt.tight_layout() plt.show() # 6. 滚动统计 # 计算滚动平均和标准差 df['rolling_mean'] = df['value'].rolling(window=30).mean() df['rolling_std'] = df['value'].rolling(window=30).std() # 绘制原始数据和滚动平均 plt.figure(figsize=(12, 6)) plt.plot(df['value'], label='Original') plt.plot(df['rolling_mean'], label='30-day Rolling Mean') plt.legend() plt.title('Time Series with Rolling Mean') plt.show() # 7. 时间序列预测 # 使用Holt-Winters指数平滑方法 # 拆分训练集和测试集 train_size = int(len(df) * 0.8) train, test = df.iloc[:train_size], df.iloc[train_size:] # 拟合模型 model = ExponentialSmoothing( train['value'], trend='add', seasonal='add', seasonal_periods=12 ).fit() # 预测 forecast = model.forecast(len(test)) # 计算误差 from sklearn.metrics import mean_squared_error, mean_absolute_error mse = mean_squared_error(test['value'], forecast) mae = mean_absolute_error(test['value'], forecast) print(f"Mean Squared Error: {mse:.2f}") print(f"Mean Absolute Error: {mae:.2f}") # 绘制预测结果 plt.figure(figsize=(12, 6)) plt.plot(train.index, train['value'], label='Train') plt.plot(test.index, test['value'], label='Test') plt.plot(test.index, forecast, label='Forecast') plt.legend() plt.title('Time Series Forecast') plt.show() # 8. 异常检测 # 使用Z-score方法检测异常值 df['z_score'] = (df['value'] - df['value'].mean()) / df['value'].std() threshold = 3 # 3个标准差 anomalies = df[df['z_score'].abs() > threshold] # 绘制异常值 plt.figure(figsize=(12, 6)) plt.plot(df.index, df['value'], label='Original') plt.scatter(anomalies.index, anomalies['value'], color='red', label='Anomalies') plt.legend() plt.title('Anomaly Detection') plt.show() print(f"Detected {len(anomalies)} anomalies") 

8. 性能优化与最佳实践

8.1 提高数据读取性能

import pandas as pd import time # 1. 只读取需要的列 # 不好的做法:读取所有列 start_time = time.time() df_all = pd.read_csv('large_dataset.csv') print(f"Read all columns: {time.time() - start_time:.2f} seconds") # 好的做法:只读取需要的列 start_time = time.time() cols_to_read = ['col1', 'col2', 'col3'] # 只需要的列 df_selected = pd.read_csv('large_dataset.csv', usecols=cols_to_read) print(f"Read selected columns: {time.time() - start_time:.2f} seconds") # 2. 指定数据类型 # 不好的做法:让Pandas推断数据类型 start_time = time.time() df_inferred = pd.read_csv('large_dataset.csv') print(f"Read with inferred dtypes: {time.time() - start_time:.2f} seconds") # 好的做法:指定数据类型 start_time = time.time() dtypes = { 'col1': 'int32', 'col2': 'float32', 'col3': 'category', 'col4': 'object' } df_specified = pd.read_csv('large_dataset.csv', dtype=dtypes) print(f"Read with specified dtypes: {time.time() - start_time:.2f} seconds") # 3. 使用更高效的文件格式 # CSV读取 start_time = time.time() df_csv = pd.read_csv('large_dataset.csv') print(f"Read CSV: {time.time() - start_time:.2f} seconds") # 保存为Parquet格式 df_csv.to_parquet('large_dataset.parquet') # Parquet读取 start_time = time.time() df_parquet = pd.read_parquet('large_dataset.parquet') print(f"Read Parquet: {time.time() - start_time:.2f} seconds") # 4. 使用低内存模式 start_time = time.time() df_low_memory = pd.read_csv('large_dataset.csv', low_memory=False) print(f"Read with low_memory=False: {time.time() - start_time:.2f} seconds") # 5. 使用多线程 start_time = time.time() df_threaded = pd.read_csv('large_dataset.csv', engine='c') # C引擎 print(f"Read with C engine: {time.time() - start_time:.2f} seconds") 

8.2 Pandas操作优化

import pandas as pd import numpy as np # 创建测试DataFrame size = 1000000 df = pd.DataFrame({ 'A': np.random.rand(size), 'B': np.random.rand(size), 'C': np.random.choice(['low', 'medium', 'high'], size=size), 'D': pd.date_range('2020-01-01', periods=size, freq='D') }) # 1. 避免循环,使用向量化操作 # 不好的做法:使用循环 start_time = time.time() result = [] for i in range(len(df)): result.append(df['A'].iloc[i] + df['B'].iloc[i]) df['E'] = result print(f"Loop operation: {time.time() - start_time:.2f} seconds") # 好的做法:使用向量化操作 start_time = time.time() df['E'] = df['A'] + df['B'] print(f"Vectorized operation: {time.time() - start_time:.2f} seconds") # 2. 使用apply替代循环 # 不好的做法:使用循环 start_time = time.time() categories = [] for val in df['A']: if val < 0.3: categories.append('low') elif val < 0.7: categories.append('medium') else: categories.append('high') df['F'] = categories print(f"Categorical assignment with loop: {time.time() - start_time:.2f} seconds") # 好的做法:使用apply start_time = time.time() def categorize(value): if value < 0.3: return 'low' elif value < 0.7: return 'medium' else: return 'high' df['F'] = df['A'].apply(categorize) print(f"Categorical assignment with apply: {time.time() - start_time:.2f} seconds") # 更好的做法:使用cut start_time = time.time() df['F'] = pd.cut(df['A'], bins=[0, 0.3, 0.7, 1], labels=['low', 'medium', 'high']) print(f"Categorical assignment with cut: {time.time() - start_time:.2f} seconds") # 3. 使用isin代替多个OR条件 # 不好的做法 start_time = time.time() filtered = df[(df['C'] == 'low') | (df['C'] == 'medium')] print(f"Filter with OR: {time.time() - start_time:.2f} seconds") # 好的做法 start_time = time.time() filtered = df[df['C'].isin(['low', 'medium'])] print(f"Filter with isin: {time.time() - start_time:.2f} seconds") # 4. 使用query方法进行复杂过滤 # 不好的做法 start_time = time.time() complex_filtered = df[(df['A'] > 0.5) & (df['B'] < 0.5) & (df['C'] == 'high')] print(f"Complex filter with boolean indexing: {time.time() - start_time:.2f} seconds") # 好的做法 start_time = time.time() complex_filtered = df.query('A > 0.5 and B < 0.5 and C == "high"') print(f"Complex filter with query: {time.time() - start_time:.2f} seconds") # 5. 使用eval进行复杂计算 # 不好的做法 start_time = time.time() df['G'] = df['A'] * df['B'] + (df['A'] / df['B']) print(f"Complex calculation with standard operators: {time.time() - start_time:.2f} seconds") # 好的做法 start_time = time.time() df['G'] = df.eval('A * B + (A / B)') print(f"Complex calculation with eval: {time.time() - start_time:.2f} seconds") # 6. 使用groupby的agg方法一次性计算多个统计量 # 不好的做法 start_time = time.time() grouped = df.groupby('C') mean_a = grouped['A'].mean() mean_b = grouped['B'].mean() count = grouped.size() result = pd.DataFrame({'mean_A': mean_a, 'mean_B': mean_b, 'count': count}) print(f"Multiple aggregations separately: {time.time() - start_time:.2f} seconds") # 好的做法 start_time = time.time() result = df.groupby('C').agg({ 'A': 'mean', 'B': 'mean', 'C': 'size' }).rename(columns={'A': 'mean_A', 'B': 'mean_B', 'C': 'count'}) print(f"Multiple aggregations with agg: {time.time() - start_time:.2f} seconds") 

8.3 内存管理最佳实践

import pandas as pd import numpy as np import gc # 1. 及时删除不再需要的大型DataFrame def process_large_data(): # 创建大型DataFrame large_df = pd.DataFrame(np.random.rand(1000000, 10)) # 处理数据 processed_df = large_df.mean(axis=1).to_frame() # 及时删除不再需要的大型DataFrame del large_df gc.collect() # 强制垃圾回收 return processed_df result_df = process_large_data() print("Processed data shape:", result_df.shape) # 2. 使用迭代器处理大型数据集 def process_in_chunks(file_path, chunk_size=100000): # 创建一个空DataFrame存储结果 final_result = pd.DataFrame() # 分块读取和处理 for chunk in pd.read_csv(file_path, chunksize=chunk_size): # 处理每个数据块 processed_chunk = chunk.groupby('category').value.mean() # 累积结果 if final_result.empty: final_result = processed_chunk else: final_result = final_result.add(processed_chunk, fill_value=0) # 显式删除数据块以释放内存 del chunk gc.collect() return final_result # 3. 使用适当的数据类型减少内存使用 def optimize_memory_usage(df): # 创建副本 df_opt = df.copy() # 优化整数列 for col in df_opt.select_dtypes(include=['int64']).columns: df_opt[col] = pd.to_numeric(df_opt[col], downcast='integer') # 优化浮点数列 for col in df_opt.select_dtypes(include=['float64']).columns: df_opt[col] = pd.to_numeric(df_opt[col], downcast='float') # 优化对象类型列 for col in df_opt.select_dtypes(include=['object']).columns: num_unique = df_opt[col].nunique() if num_unique / len(df_opt[col]) < 0.5: # 如果唯一值比例小于50% df_opt[col] = df_opt[col].astype('category') return df_opt # 测试内存优化 original_df = pd.DataFrame({ 'int_col': np.random.randint(0, 100, size=1000000), 'float_col': np.random.rand(1000000), 'str_col': np.random.choice(['low', 'medium', 'high'], size=1000000) }) print("Original memory usage:", original_df.memory_usage(deep=True).sum() / (1024 * 1024), "MB") optimized_df = optimize_memory_usage(original_df) print("Optimized memory usage:", optimized_df.memory_usage(deep=True).sum() / (1024 * 1024), "MB") # 4. 使用稀疏数据结构处理包含大量缺失值或重复值的数据 # 创建包含大量零值的DataFrame sparse_data = pd.DataFrame(np.random.choice([0, 1, 2, 3], size=(10000, 100), p=[0.9, 0.03, 0.03, 0.04])) # 转换为稀疏DataFrame sparse_df = sparse_data.astype(pd.SparseDtype("int", fill_value=0)) print("Dense memory usage:", sparse_data.memory_usage().sum() / (1024 * 1024), "MB") print("Sparse memory usage:", sparse_df.memory_usage().sum() / (1024 * 1024), "MB") # 5. 使用内存映射文件处理非常大的数组 # 创建一个非常大的数组 large_array = np.random.rand(10000, 10000) # 保存到磁盘 np.save('large_array.npy', large_array) # 使用内存映射加载 mmap_array = np.load('large_array.npy', mmap_mode='r') # 现在可以像普通数组一样访问mmap_array,但它不会完全加载到内存中 print("Shape of memory-mapped array:", mmap_array.shape) print("First element:", mmap_array[0, 0]) 

结论

本文详细介绍了Python Pandas库中各种数据读取操作,从基础的CSV、Excel文件读取,到数据库、网络API数据获取,再到特殊格式如JSON、HTML数据的处理。我们还探讨了处理大数据的技巧,包括分块读取、使用Dask处理超大数据集,以及数据类型优化等方法。

通过实际案例,我们展示了如何解决真实世界中的数据处理问题,包括数据清洗与预处理、多源数据整合、时间序列数据分析等。最后,我们分享了性能优化和最佳实践,帮助读者提高数据处理效率。

掌握这些数据读取和处理技巧,将使你能够更高效地处理各种数据分析任务,从基础的数据探索到复杂的数据分析项目。Pandas作为Python数据分析的核心工具,其强大的数据读取和处理能力为数据科学工作提供了坚实的基础。