メインコンテンツまでスキップ

MySQL 接続ガイド

アプリケーションから MySQL データベースに接続する方法を説明します。

概要

各テナント・環境では、MySQL Flexible Server が自動的にプロビジョニングされます。

MySQL 構成: ホスト: mysql-{tenant}-{env}.mysql.database.azure.com、ポート: 3306、データベース: {tenant}_{env}_db、SSL: 必須(Azure標準)

対象テンプレート: frontend-backend(Backend コンテナ)、monolith(Frontend (Monolith) コンテナ)


環境変数

MySQL への接続に必要な環境変数は、Terraform によって自動的に設定されます。

提供される環境変数: DATABASE_URL(完全な接続 URL、ランタイム別形式)、DB_URL(SQLAlchemy 用 URL、DB 名なし)、MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DATABASE

重要な注意事項:

  1. DATABASE_URL の形式はランタイムによって異なります: runtime = "node" (デフォルト) は mysql:// 形式、runtime = "python"mysql+pymysql:// 形式(SQLAlchemy 互換)
  2. DB_URL はデータベース名を含みません: pj-shared-fastapi などで使用されます。データベース名は MYSQL_DATABASE から取得します
  3. これらの環境変数は .env ファイルに記載しても自動上書きされます

詳細は Terraform が提供する環境変数仕様 を参照。


接続方法

Python での接続

1. PyMySQL を使用(基本)

インストール:

pip install pymysql cryptography

接続例:

import os
import pymysql

# 環境変数から接続情報を取得
host = os.getenv("MYSQL_HOST")
port = int(os.getenv("MYSQL_PORT", 3306))
user = os.getenv("MYSQL_USER")
password = os.getenv("MYSQL_PASSWORD")
database = os.getenv("MYSQL_DATABASE")

# 接続
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)

try:
with connection.cursor() as cursor:
# クエリの実行
cursor.execute("SELECT * FROM users LIMIT 10")
result = cursor.fetchall()
print(result)
finally:
connection.close()

2. SQLAlchemy を使用(ORM)

インストール:

pip install sqlalchemy pymysql

接続例:

import os
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# DATABASE_URLを環境変数から取得
database_url = os.getenv("DATABASE_URL")

# エンジンの作成
engine = create_engine(
database_url,
pool_pre_ping=True, # 接続の健全性チェック
pool_size=10, # コネクションプールサイズ
max_overflow=20 # 最大オーバーフロー
)

# セッションの作成
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 使用例
def get_users():
session = SessionLocal()
try:
result = session.execute(text("SELECT * FROM users LIMIT 10"))
users = result.fetchall()
return users
finally:
session.close()

3. FastAPI での使用

依存関係の設定:

import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# DATABASE_URLを環境変数から取得
SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL")

engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 依存性注入
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

モデルの定義:

from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
name = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)

API エンドポイント:

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session

app = FastAPI()

@app.get("/users")
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users

@app.post("/users")
def create_user(email: str, name: str, db: Session = Depends(get_db)):
user = User(email=email, name=name)
db.add(user)
db.commit()
db.refresh(user)
return user

4. pj-shared-fastapi を使用

GenerativeX の pj-shared-fastapi を使用する場合:

import os
from pj_shared_fastapi.database import get_db_engine, get_session

# DB_URLとMYSQL_DATABASEを使用
db_url = os.getenv("DB_URL")
database = os.getenv("MYSQL_DATABASE")

# エンジンの作成
engine = get_db_engine(db_url, database)

# セッションの取得
session = get_session(engine)

Node.js での接続

1. mysql2 を使用(基本)

インストール:

npm install mysql2

接続例:

const mysql = require('mysql2');

// 環境変数から接続情報を取得
const connection = mysql.createConnection({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE
});

// 接続
connection.connect((err) => {
if (err) {
console.error('Connection error:', err);
return;
}
console.log('Connected to MySQL');
});

// クエリの実行
connection.query('SELECT * FROM users LIMIT 10', (err, results) => {
if (err) {
console.error('Query error:', err);
return;
}
console.log(results);
});

// 接続を閉じる
connection.end();

2. mysql2 を使用(Promise / async-await)

const mysql = require('mysql2/promise');

async function getUsers() {
// コネクションの作成
const connection = await mysql.createConnection({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE
});

try {
// クエリの実行
const [rows, fields] = await connection.execute(
'SELECT * FROM users LIMIT 10'
);
console.log(rows);
return rows;
} finally {
await connection.end();
}
}

getUsers().catch(console.error);

3. コネクションプールの使用

本番環境では、コネクションプールの使用を推奨します:

const mysql = require('mysql2/promise');

// コネクションプールの作成
const pool = mysql.createPool({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
waitForConnections: true,
connectionLimit: 10, // 最大接続数
queueLimit: 0,
enableKeepAlive: true,
keepAliveInitialDelay: 0
});

// 使用例
async function getUsers() {
const connection = await pool.getConnection();
try {
const [rows] = await connection.execute('SELECT * FROM users LIMIT 10');
return rows;
} finally {
connection.release(); // コネクションをプールに返却
}
}

// または、プールから直接実行
async function getUsersSimple() {
const [rows] = await pool.execute('SELECT * FROM users LIMIT 10');
return rows;
}

4. Prisma ORM を使用

インストール:

npm install @prisma/client
npm install -D prisma

初期化:

npx prisma init

スキーマ定義 (prisma/schema.prisma):

datasource db {
provider = "mysql"
url = env("DATABASE_URL")
}

generator client {
provider = "prisma-client-js"
}

model User {
id Int @id @default(autoincrement())
email String @unique
name String?
createdAt DateTime @default(now()) @map("created_at")

@@map("users")
}

使用例:

const { PrismaClient } = require('@prisma/client');

const prisma = new PrismaClient();

async function main() {
// ユーザーの取得
const users = await prisma.user.findMany({
take: 10,
});
console.log(users);

// ユーザーの作成
const newUser = await prisma.user.create({
data: {
email: 'test@example.com',
name: 'Test User',
},
});
console.log(newUser);
}

main()
.catch((e) => console.error(e))
.finally(async () => {
await prisma.$disconnect();
});

使用例

シナリオ 1: ユーザー情報の取得と作成(FastAPI)

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# データベース設定
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# モデル定義
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
name = Column(String(100))
created_at = Column(DateTime, default=datetime.utcnow)

# FastAPIアプリ
app = FastAPI()

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": user.id,
"email": user.email,
"name": user.name,
"created_at": user.created_at.isoformat()
}

@app.post("/users")
def create_user(email: str, name: str, db: Session = Depends(get_db)):
# メールアドレスの重複チェック
existing_user = db.query(User).filter(User.email == email).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")

# ユーザーの作成
user = User(email=email, name=name)
db.add(user)
db.commit()
db.refresh(user)
return {
"id": user.id,
"email": user.email,
"name": user.name,
"created_at": user.created_at.isoformat()
}

シナリオ 2: トランザクション処理(Node.js)

const mysql = require('mysql2/promise');

const pool = mysql.createPool({
host: process.env.MYSQL_HOST,
port: process.env.MYSQL_PORT || 3306,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
waitForConnections: true,
connectionLimit: 10
});

async function transferPoints(fromUserId, toUserId, points) {
const connection = await pool.getConnection();

try {
// トランザクション開始
await connection.beginTransaction();

// 送信元のポイントを減らす
await connection.execute(
'UPDATE users SET points = points - ? WHERE id = ? AND points >= ?',
[points, fromUserId, points]
);

// 送信先のポイントを増やす
await connection.execute(
'UPDATE users SET points = points + ? WHERE id = ?',
[points, toUserId]
);

// トランザクションをコミット
await connection.commit();
console.log('Transfer successful');
} catch (error) {
// エラー時はロールバック
await connection.rollback();
console.error('Transfer failed, rolled back:', error);
throw error;
} finally {
connection.release();
}
}

// 使用例
transferPoints(1, 2, 100)
.then(() => console.log('Done'))
.catch(console.error);

ベストプラクティス

1. コネクションプールを使用する

理由: 接続の作成・破棄はコストが高いため、コネクションプールで再利用します。

Python (SQLAlchemy):

engine = create_engine(
database_url,
pool_size=10, # プールサイズ
max_overflow=20, # 最大オーバーフロー
pool_pre_ping=True, # 接続の健全性チェック
pool_recycle=3600 # 1時間で接続を再作成
)

Node.js (mysql2):

const pool = mysql.createPool({
connectionLimit: 10,
waitForConnections: true,
queueLimit: 0
});

2. 環境変数を直接使用する

良い例:

import os
database_url = os.getenv("DATABASE_URL")

悪い例:

# ハードコードしない
database_url = "mysql://user:pass@host/db"

3. 接続エラーを適切にハンドリングする

from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

try:
engine = create_engine(database_url)
connection = engine.connect()
except OperationalError as e:
print(f"Database connection failed: {e}")
# リトライロジックやアラート通知

4. SQL インジェクション対策

必ずパラメータ化されたクエリを使用:

良い例 (Python):

cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))

悪い例 (Python):

# SQL インジェクションのリスク
cursor.execute(f"SELECT * FROM users WHERE id = {user_id}")

良い例 (Node.js):

await connection.execute('SELECT * FROM users WHERE id = ?', [userId]);

悪い例 (Node.js):

// SQL インジェクションのリスク
await connection.execute(`SELECT * FROM users WHERE id = ${userId}`);

5. トランザクションを適切に使用する

複数の操作を1つのトランザクションで実行することで、データの整合性を保ちます。

Python:

from sqlalchemy.orm import Session

def update_user_and_log(db: Session, user_id: int, new_name: str):
try:
user = db.query(User).filter(User.id == user_id).first()
user.name = new_name

log = AuditLog(user_id=user_id, action="update_name")
db.add(log)

db.commit()
except Exception as e:
db.rollback()
raise

Node.js:

async function updateUserAndLog(userId, newName) {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();

await connection.execute(
'UPDATE users SET name = ? WHERE id = ?',
[newName, userId]
);

await connection.execute(
'INSERT INTO audit_logs (user_id, action) VALUES (?, ?)',
[userId, 'update_name']
);

await connection.commit();
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}

トラブルシューティング

接続できない

症状: OperationalErrorConnection refused

原因と対処:

  1. 環境変数が正しく設定されているか確認
import os
print("MYSQL_HOST:", os.getenv("MYSQL_HOST"))
print("MYSQL_USER:", os.getenv("MYSQL_USER"))
print("MYSQL_DATABASE:", os.getenv("MYSQL_DATABASE"))
# パスワードは表示しない
  1. MySQL サーバーが起動しているか確認

Azure Portal で MySQL Flexible Server の状態を確認します。

  1. ネットワークルールを確認

Container Apps からの接続が許可されているか確認します(通常は自動設定されています)。

SSL エラー

症状: SSL connection error

対処: Azure MySQL は SSL 接続が必須です。ライブラリの設定を確認してください。

Python (PyMySQL):

connection = pymysql.connect(
host=host,
ssl={'ssl': True} # SSL有効化
)

Python (SQLAlchemy):

engine = create_engine(
database_url,
connect_args={"ssl": {"ssl": True}}
)

タイムアウトエラー

症状: Connection timeout

原因と対処:

  1. 接続プールの設定を調整
engine = create_engine(
database_url,
pool_pre_ping=True, # 接続の健全性チェック
pool_recycle=3600 # 1時間で接続を再作成
)
  1. Container Apps のタイムアウト設定を確認

infra.config.ymltimeout を調整できます。

文字化け

症状: 日本語が文字化けする

対処: 文字セットを utf8mb4 に設定してください。

Python:

connection = pymysql.connect(
host=host,
charset='utf8mb4'
)

Node.js:

const connection = mysql.createConnection({
host: process.env.MYSQL_HOST,
charset: 'utf8mb4'
});

まとめ

  • 環境変数は自動設定される: DATABASE_URLMYSQL_* は Terraform が設定
  • コネクションプールを使用: 本番環境では必須
  • パラメータ化されたクエリを使用: SQL インジェクション対策
  • トランザクションを適切に使用: データの整合性を保つ

詳細は以下のドキュメントも参照してください: