MySQL 接続ガイド
アプリケーションから MySQL データベースに接続する方法を説明します。
概要
各テナント・環境では、MySQL Flexible Server が自動的にプロビジョニングされます。
MySQL 構成: ホスト: mysql-{tenant}-{env}.mysql.database.azure.com、ポート: 3306、データベース: {tenant}_{env}_db、SSL: 必須(Azure標準)
対象テンプレート: frontend-backend(Backend コンテナ)、monolith(Frontend (Monolith) コンテナ)
環境変数
MySQL への接続に必要な環境変数は、Terraform によって自動的に設定されます。
提供される環境変数: DATABASE_URL(完全な接続 URL、ランタイム別形式)、DB_URL(SQLAlchemy 用 URL、DB 名なし)、MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE
重要な注意事項:
- DATABASE_URL の形式はランタイムによって異なります:
runtime = "node"(デフォルト) はmysql://形式、runtime = "python"はmysql+pymysql://形式(SQLAlchemy 互換) - DB_URL はデータベース名を含みません:
pj-shared-fastapiなどで使用されます。データベース名はMYSQL_DATABASEから取得します - これらの環境変数は
.envファイルに記載しても自動上書きされます
詳細は Terraform が提供する環境変数仕様 を参照。
接続方法
Python での接続
1. PyMySQL を使用(基本)
インストール:
pip install pymysql cryptography
接続例:
import os
import pymysql
# 環境変数から接続情報を取得
host = os.getenv("MYSQL_HOST")
port = int(os.getenv("MYSQL_PORT", 3306))
user = os.getenv("MYSQL_USER")
password = os.getenv("MYSQL_PASSWORD")
database = os.getenv("MYSQL_DATABASE")
# 接続
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
try:
with connection.cursor() as cursor:
# クエリの実行
cursor.execute("SELECT * FROM users LIMIT 10")
result = cursor.fetchall()
print(result)
finally:
connection.close()
2. SQLAlchemy を使用(ORM)
インストール:
pip install sqlalchemy pymysql
接続例:
import os
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# DATABASE_URLを環境変数から取得
database_url = os.getenv("DATABASE_URL")
# エンジンの作成
engine = create_engine(
database_url,
pool_pre_ping=True, # 接続の健全性チェック
pool_size=10, # コネクションプールサイズ
max_overflow=20 # 最大オーバーフロー
)
# セッションの作成
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 使用例
def get_users():
session = SessionLocal()
try:
result = session.execute(text("SELECT * FROM users LIMIT 10"))
users = result.fetchall()
return users
finally:
session.close()
3. FastAPI での使用
依存関係の設定:
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# DATABASE_URLを環境変数から取得
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 依存性注入
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
モデルの定義:
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
name = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
API エンドポイント:
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
app = FastAPI()
@app.get("/users")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
@app.post("/users")
def create_user(email: str, name: str, db: Session = Depends(get_db)):
user = User(email=email, name=name)
db.add(user)
db.commit()
db.refresh(user)
return user
4. pj-shared-fastapi を使用
GenerativeX の pj-shared-fastapi を使用する場合:
import os
from pj_shared_fastapi.database import get_db_engine, get_session
# DB_URLとMYSQL_DATABASEを使用
db_url = os.getenv("DB_URL")
database = os.getenv("MYSQL_DATABASE")
# エンジンの作成
engine = get_db_engine(db_url, database)
# セッションの取得
session = get_session(engine)
Node.js での接続
1. mysql2 を使用(基本)
インストール:
npm install mysql2
接続例:
const mysql = require('mysql2');
// 環境変数から接続情報を取得
const connection = mysql.createConnection({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE
});
// 接続
connection.connect((err) => {
if (err) {
console.error('Connection error:', err);
return;
}
console.log('Connected to MySQL');
});
// クエリの実行
connection.query('SELECT * FROM users LIMIT 10', (err, results) => {
if (err) {
console.error('Query error:', err);
return;
}
console.log(results);
});
// 接続を閉じる
connection.end();
2. mysql2 を使用(Promise / async-await)
const mysql = require('mysql2/promise');
async function getUsers() {
// コネクションの作成
const connection = await mysql.createConnection({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE
});
try {
// クエリの実行
const [rows, fields] = await connection.execute(
'SELECT * FROM users LIMIT 10'
);
console.log(rows);
return rows;
} finally {
await connection.end();
}
}
getUsers().catch(console.error);
3. コネクションプールの使用
本番環境では、コネクションプールの使用を推奨します:
const mysql = require('mysql2/promise');
// コネクションプールの作成
const pool = mysql.createPool({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
waitForConnections: true,
connectionLimit: 10, // 最大接続数
queueLimit: 0,
enableKeepAlive: true,
keepAliveInitialDelay: 0
});
// 使用例
async function getUsers() {
const connection = await pool.getConnection();
try {
const [rows] = await connection.execute('SELECT * FROM users LIMIT 10');
return rows;
} finally {
connection.release(); // コネクションをプールに返却
}
}
// または、プールから直接実行
async function getUsersSimple() {
const [rows] = await pool.execute('SELECT * FROM users LIMIT 10');
return rows;
}
4. Prisma ORM を使用
インストール:
npm install @prisma/client
npm install -D prisma
初期化:
npx prisma init
スキーマ定義 (prisma/schema.prisma):
datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}
generator client {
provider = "prisma-client-js"
}
model User {
id Int @id @default(autoincrement())
email String @unique
name String?
createdAt DateTime @default(now()) @map("created_at")
@@map("users")
}
使用例:
const { PrismaClient } = require('@prisma/client');
const prisma = new PrismaClient();
async function main() {
// ユーザーの取得
const users = await prisma.user.findMany({
take: 10,
});
console.log(users);
// ユーザーの作成
const newUser = await prisma.user.create({
data: {
email: 'test@example.com',
name: 'Test User',
},
});
console.log(newUser);
}
main()
.catch((e) => console.error(e))
.finally(async () => {
await prisma.$disconnect();
});
使用例
シナリオ 1: ユーザー情報の取得と作成(FastAPI)
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# データベース設定
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# モデル定義
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
name = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)
# FastAPIアプリ
app = FastAPI()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": user.id,
"email": user.email,
"name": user.name,
"created_at": user.created_at.isoformat()
}
@app.post("/users")
def create_user(email: str, name: str, db: Session = Depends(get_db)):
# メールアドレスの重複チェック
existing_user = db.query(User).filter(User.email == email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# ユーザーの作成
user = User(email=email, name=name)
db.add(user)
db.commit()
db.refresh(user)
return {
"id": user.id,
"email": user.email,
"name": user.name,
"created_at": user.created_at.isoformat()
}
シナリオ 2: トランザクション処理(Node.js)
const mysql = require('mysql2/promise');
const pool = mysql.createPool({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
waitForConnections: true,
connectionLimit: 10
});
async function transferPoints(fromUserId, toUserId, points) {
const connection = await pool.getConnection();
try {
// トランザクション開始
await connection.beginTransaction();
// 送信元のポイントを減らす
await connection.execute(
'UPDATE users SET points = points - ? WHERE id = ? AND points >= ?',
[points, fromUserId, points]
);
// 送信先のポイントを増やす
await connection.execute(
'UPDATE users SET points = points + ? WHERE id = ?',
[points, toUserId]
);
// トランザクションをコミット
await connection.commit();
console.log('Transfer successful');
} catch (error) {
// エラー時はロールバック
await connection.rollback();
console.error('Transfer failed, rolled back:', error);
throw error;
} finally {
connection.release();
}
}
// 使用例
transferPoints(1, 2, 100)
.then(() => console.log('Done'))
.catch(console.error);
ベストプラクティス
1. コネクションプールを使用する
理由: 接続の作成・破棄はコストが高いため、コネクションプールで再利用します。
Python (SQLAlchemy):
engine = create_engine(
database_url,
pool_size=10, # プールサイズ
max_overflow=20, # 最大オーバーフロー
pool_pre_ping=True, # 接続の健全性チェック
pool_recycle=3600 # 1時間で接続を再作成
)
Node.js (mysql2):
const pool = mysql.createPool({
connectionLimit: 10,
waitForConnections: true,
queueLimit: 0
});
2. 環境変数を直接使用する
良い例:
import os
database_url = os.getenv("DATABASE_URL")
悪い例:
# ハードコードしない
database_url = "mysql://user:pass@host/db"
3. 接続エラーを適切にハンドリングする
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
try:
engine = create_engine(database_url)
connection = engine.connect()
except OperationalError as e:
print(f"Database connection failed: {e}")
# リトライロジックやアラート通知
4. SQL インジェクション対策
必ずパラメータ化されたクエリを使用:
良い例 (Python):
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
悪い例 (Python):
# SQL インジェクションのリスク
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")
良い例 (Node.js):
await connection.execute('SELECT * FROM users WHERE id = ?', [userId]);
悪い例 (Node.js):
// SQL インジェクションのリスク
await connection.execute(`SELECT * FROM users WHERE id = ${userId}`);
5. トランザクションを適切に使用する
複数の操作を1つのトランザクションで実行することで、データの整合性を保ちます。
Python:
from sqlalchemy.orm import Session
def update_user_and_log(db: Session, user_id: int, new_name: str):
try:
user = db.query(User).filter(User.id == user_id).first()
user.name = new_name
log = AuditLog(user_id=user_id, action="update_name")
db.add(log)
db.commit()
except Exception as e:
db.rollback()
raise
Node.js:
async function updateUserAndLog(userId, newName) {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
await connection.execute(
'UPDATE users SET name = ? WHERE id = ?',
[newName, userId]
);
await connection.execute(
'INSERT INTO audit_logs (user_id, action) VALUES (?, ?)',
[userId, 'update_name']
);
await connection.commit();
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}
トラブルシューティング
接続できない
症状: OperationalError や Connection refused
原因と対処:
- 環境変数が正しく設定されているか確認
import os
print("MYSQL_HOST:", os.getenv("MYSQL_HOST"))
print("MYSQL_USER:", os.getenv("MYSQL_USER"))
print("MYSQL_DATABASE:", os.getenv("MYSQL_DATABASE"))
# パスワードは表示しない
- MySQL サーバーが起動しているか確認
Azure Portal で MySQL Flexible Server の状態を確認します。
- ネットワークルールを確認
Container Apps からの接続が許可されているか確認します(通常は自動設定されています)。
SSL エラー
症状: SSL connection error
対処: Azure MySQL は SSL 接続が必須です。ライブラリの設定を確認してください。
Python (PyMySQL):
connection = pymysql.connect(
host=host,
ssl={'ssl': True} # SSL有効化
)
Python (SQLAlchemy):
engine = create_engine(
database_url,
connect_args={"ssl": {"ssl": True}}
)
タイムアウトエラー
症状: Connection timeout
原因と対処:
- 接続プールの設定を調整
engine = create_engine(
database_url,
pool_pre_ping=True, # 接続の健全性チェック
pool_recycle=3600 # 1時間で接続を再作成
)
- Container Apps のタイムアウト設定を確認
infra.config.yml で timeout を調整できます。
文字化け
症状: 日本語が文字化けする
対処: 文字セットを utf8mb4 に設定してください。
Python:
connection = pymysql.connect(
host=host,
charset='utf8mb4'
)
Node.js:
const connection = mysql.createConnection({
host: process.env.MYSQL_HOST,
charset: 'utf8mb4'
});
まとめ
- 環境変数は自動設定される:
DATABASE_URLやMYSQL_*は Terraform が設定 - コネクションプールを使用: 本番環境では必須
- パラメータ化されたクエリを使用: SQL インジェクション対策
- トランザクションを適切に使用: データの整合性を保つ
詳細は以下のドキュメントも参照してください:
- 環境変数ガイド - 環境変数の取得方法
- データベースマイグレーション - DB マイグレーションの実行方法
- Terraform が提供する環境変数仕様 - 環境変数の詳細仕様