SQL Database MCP: How to Use MCP Servers with SQL Databases
Introduction
Model Context Protocol (MCP) represents a transformative paradigm in AI-database interactions, establishing a standardized communication framework between large language models (LLMs) and relational database management systems (RDBMS). This architecture enables bidirectional, stateful communication that transcends traditional SQL client limitations through a well-defined protocol layer. By implementing MCP servers as middleware between LLMs and SQL databases, developers can expose database functionality through a semantically rich context layer rather than direct query execution, thereby maintaining robust security boundaries while significantly enhancing model capabilities for complex data operations.
The technical implementation of MCP with SQL databases addresses fundamental challenges in data access patterns, query optimization, and security that conventional API interfaces cannot efficiently resolve. This protocol layer abstracts the underlying database complexity while providing a standardized interface that supports streaming results, stateful transactions, and schema introspection through a unified communication channel.
Explore the comprehensive SQL MCP server implementation in the official repository: github.com/RichardHan/mssql_mcp_server (opens in a new tab)
MCP Architecture for SQL Databases
The SQL Database MCP architecture implements a specialized middleware layer that bridges the semantic gap between LLM reasoning and relational database operations. This architecture comprises several critical components:
Protocol Specification
The SQL MCP protocol defines a structured JSON-based message format that encapsulates database operations:
{
"message_id": "req_6e7ba0fc-543b-4b8c-b03d-7c117d6c7e45",
"timestamp": "2025-04-04T15:23:47.089Z",
"source": "model",
"destination": "sql_server",
"message_type": "request",
"content": {
"operation": "query",
"parameters": {
"sql": "SELECT product_id, name, price FROM products WHERE category = @category AND price < @maxPrice",
"bindings": {
"@category": "electronics",
"@maxPrice": 500
},
"options": {
"timeout": 5000,
"maxRows": 100
}
}
},
"metadata": {
"session_id": "sess_abc123",
"transaction_id": "tx_789xyz"
}
}
Server Components
A comprehensive SQL MCP server implementation contains these core modules:
- Request Handler: Processes incoming requests, validates message format, and routes to appropriate database operation handlers
- Query Processor: Parses SQL statements, performs security validation, and manages execution context
- Parameter Binding: Securely binds parameters to prevent SQL injection vulnerabilities
- Result Formatter: Transforms database result sets into protocol-compliant response messages
- Transaction Manager: Maintains transaction state across multiple operations
- Schema Inspector: Provides metadata about database objects for context enrichment
- Security Layer: Implements access control and query analysis to prevent unauthorized operations
Implementing an SQL MCP Server
Environment Requirements
- Node.js ≥ 16.x
- Database driver for target SQL database (e.g., mssql, mysql2, pg)
- Express.js for HTTP server functionality
- Winston or similar for structured logging
- Redis (optional) for distributed session management
Server Implementation
The core server implementation requires several key components:
// sql-mcp-server.js
const express = require('express');
const cors = require('cors');
const winston = require('winston');
const { v4: uuidv4 } = require('uuid');
const { createDatabasePool } = require('./database');
const { validateRequest } = require('./validators');
const { executeOperation } = require('./operations');
// Configure logging
const logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
// Initialize database connection pool
const dbPool = createDatabasePool({
server: process.env.DB_SERVER,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
options: {
encrypt: true,
trustServerCertificate: process.env.NODE_ENV !== 'production',
connectionTimeout: 30000,
requestTimeout: 30000,
pool: {
max: 10,
min: 0,
idleTimeoutMillis: 30000
}
}
});
const app = express();
app.use(cors());
app.use(express.json({ limit: '10mb' }));
// MCP endpoint
app.post('/mcp', async (req, res) => {
const requestStart = Date.now();
try {
// Validate request structure
validateRequest(req.body);
const { message_id, content, metadata } = req.body;
const { operation, parameters } = content;
logger.info(`Processing SQL operation: ${operation}`, {
requestId: message_id,
sessionId: metadata?.session_id,
transactionId: metadata?.transaction_id
});
// Execute database operation
const result = await executeOperation({
operation,
parameters,
metadata,
dbPool,
logger
});
// Construct MCP response
const response = {
message_id: `resp_${uuidv4()}`,
in_reply_to: message_id,
timestamp: new Date().toISOString(),
source: 'sql_server',
destination: 'model',
message_type: 'response',
content: result,
metadata: {
...metadata,
processing_time_ms: Date.now() - requestStart
}
};
res.json(response);
} catch (error) {
logger.error(`Error processing request: ${error.message}`, {
stack: error.stack
});
// Construct error response
res.status(error.status || 500).json({
message_id: `err_${uuidv4()}`,
in_reply_to: req.body?.message_id || 'unknown',
timestamp: new Date().toISOString(),
source: 'sql_server',
destination: 'model',
message_type: 'error',
content: {
error: error.message,
code: error.code || 'INTERNAL_ERROR',
sql_state: error.sqlState,
sql_error_number: error.number
},
metadata: {
processing_time_ms: Date.now() - requestStart
}
});
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
logger.info(`SQL MCP Server running on port ${PORT}`);
});
Database Operations Implementation
The core functionality for handling different SQL operations:
// operations.js
const { beginTransaction, commitTransaction, rollbackTransaction } = require('./transactions');
async function executeOperation({ operation, parameters, metadata, dbPool, logger }) {
switch (operation) {
case 'query':
return await executeQuery(parameters, metadata, dbPool, logger);
case 'execute':
return await executeProcedure(parameters, metadata, dbPool, logger);
case 'begin_transaction':
return await beginTransaction(metadata, dbPool, logger);
case 'commit_transaction':
return await commitTransaction(metadata, dbPool, logger);
case 'rollback_transaction':
return await rollbackTransaction(metadata, dbPool, logger);
case 'get_schema':
return await getSchemaInformation(parameters, dbPool, logger);
default:
throw new Error(`Unsupported operation: ${operation}`);
}
}
async function executeQuery({ sql, bindings, options }, metadata, dbPool, logger) {
// Get transaction if one exists
const transactionId = metadata?.transaction_id;
const transaction = transactionId ? getTransaction(transactionId) : null;
// Create request object with appropriate transaction context
const request = transaction ? transaction.request() : dbPool.request();
// Apply timeout if specified
if (options?.timeout) {
request.timeout = options.timeout;
}
// Apply parameter bindings
if (bindings) {
Object.entries(bindings).forEach(([param, value]) => {
// Strip @ prefix if present
const paramName = param.startsWith('@') ? param.substring(1) : param;
request.input(paramName, value);
});
}
// Execute query with row count limit if specified
let result;
if (options?.maxRows) {
sql = `SET ROWCOUNT ${options.maxRows}; ${sql}; SET ROWCOUNT 0;`;
}
result = await request.query(sql);
// Format the result for MCP response
return {
recordsets: result.recordsets,
rowsAffected: result.rowsAffected,
output: result.output || {},
returnValue: result.returnValue
};
}
// Additional operation implementations...
module.exports = { executeOperation };
Advanced Features
Schema Introspection
Implementing schema introspection allows LLMs to understand database structure:
async function getSchemaInformation({ objects, filter }, dbPool, logger) {
const objectTypes = objects || ['tables', 'columns', 'views', 'procedures'];
const results = {};
// Get tables
if (objectTypes.includes('tables')) {
const tableQuery = `
SELECT
t.TABLE_SCHEMA as schema_name,
t.TABLE_NAME as table_name,
t.TABLE_TYPE as table_type
FROM
INFORMATION_SCHEMA.TABLES t
WHERE
t.TABLE_TYPE = 'BASE TABLE'
${filter ? `AND t.TABLE_NAME LIKE '%${filter}%'` : ''}
ORDER BY
t.TABLE_SCHEMA, t.TABLE_NAME
`;
const tablesResult = await dbPool.request().query(tableQuery);
results.tables = tablesResult.recordset;
}
// Get columns for tables
if (objectTypes.includes('columns')) {
const columnsQuery = `
SELECT
c.TABLE_SCHEMA as schema_name,
c.TABLE_NAME as table_name,
c.COLUMN_NAME as column_name,
c.DATA_TYPE as data_type,
c.CHARACTER_MAXIMUM_LENGTH as max_length,
c.IS_NULLABLE as is_nullable,
COLUMNPROPERTY(OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME), c.COLUMN_NAME, 'IsIdentity') as is_identity,
CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END as is_primary_key
FROM
INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT
k.TABLE_SCHEMA,
k.TABLE_NAME,
k.COLUMN_NAME
FROM
INFORMATION_SCHEMA.KEY_COLUMN_USAGE k
INNER JOIN
INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc ON k.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
WHERE
tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
) pk ON c.TABLE_SCHEMA = pk.TABLE_SCHEMA AND c.TABLE_NAME = pk.TABLE_NAME AND c.COLUMN_NAME = pk.COLUMN_NAME
${filter ? `WHERE c.TABLE_NAME LIKE '%${filter}%'` : ''}
ORDER BY
c.TABLE_SCHEMA, c.TABLE_NAME, c.ORDINAL_POSITION
`;
const columnsResult = await dbPool.request().query(columnsQuery);
results.columns = columnsResult.recordset;
}
// Additional schema queries for views, procedures, etc.
return results;
}
Transaction Management
Implementing stateful transaction management:
// transactions.js
const { v4: uuidv4 } = require('uuid');
// In-memory transaction store (use Redis for distributed systems)
const transactions = new Map();
async function beginTransaction(metadata, dbPool, logger) {
const transaction = dbPool.transaction();
const transactionId = `tx_${uuidv4()}`;
await transaction.begin();
// Store transaction with metadata
transactions.set(transactionId, {
transaction,
created: new Date().toISOString(),
session_id: metadata.session_id,
timeout: setTimeout(() => {
// Auto-rollback if transaction is not committed/rolled back
if (transactions.has(transactionId)) {
transaction.rollback()
.catch(err => logger.error(`Error auto-rolling back transaction: ${err.message}`));
transactions.delete(transactionId);
logger.warn(`Transaction ${transactionId} auto-rolled back due to timeout`);
}
}, 60000) // 1 minute timeout
});
return {
transaction_id: transactionId,
status: 'active',
created: new Date().toISOString()
};
}
async function commitTransaction(metadata, dbPool, logger) {
const { transaction_id } = metadata;
if (!transaction_id || !transactions.has(transaction_id)) {
throw new Error(`Invalid or expired transaction: ${transaction_id}`);
}
const txInfo = transactions.get(transaction_id);
try {
await txInfo.transaction.commit();
// Clean up
clearTimeout(txInfo.timeout);
transactions.delete(transaction_id);
return {
transaction_id,
status: 'committed',
committed_at: new Date().toISOString()
};
} catch (error) {
logger.error(`Error committing transaction: ${error.message}`);
throw error;
}
}
// Additional transaction operations...
module.exports = {
beginTransaction,
commitTransaction,
rollbackTransaction,
getTransaction: (id) => transactions.get(id)?.transaction
};
Security Implementation
Query Whitelisting
Implementing query whitelisting for enhanced security:
// security.js
const { parse } = require('node-sql-parser');
// Configuration for query whitelisting
const securityConfig = {
// Allowed table access patterns
allowedTables: {
'public': ['users', 'products', 'orders'],
'analytics': ['*'] // Allow all tables in analytics schema
},
// Disallowed SQL features
disallowedFeatures: ['insert', 'update', 'delete', 'create', 'drop', 'alter'],
// Maximum query complexity (joins, subqueries, etc.)
maxComplexity: 5
};
function validateQuery(sql) {
try {
// Parse SQL to AST
const ast = parse(sql);
// Check for disallowed statement types
if (Array.isArray(ast)) {
// Multiple statements detected
throw new Error('Multiple SQL statements are not allowed');
}
const statementType = ast.type.toLowerCase();
if (securityConfig.disallowedFeatures.includes(statementType)) {
throw new Error(`SQL operation not permitted: ${statementType}`);
}
// Validate table access
const tables = extractTablesFromAST(ast);
for (const table of tables) {
const { schema, name } = parseTableIdentifier(table);
// Check if schema is allowed
if (!securityConfig.allowedTables[schema]) {
throw new Error(`Access to schema '${schema}' is not permitted`);
}
// Check if table is allowed
if (securityConfig.allowedTables[schema] !== ['*'] &&
!securityConfig.allowedTables[schema].includes(name)) {
throw new Error(`Access to table '${schema}.${name}' is not permitted`);
}
}
// Validate query complexity
const complexity = calculateQueryComplexity(ast);
if (complexity > securityConfig.maxComplexity) {
throw new Error(`Query complexity score (${complexity}) exceeds maximum allowed (${securityConfig.maxComplexity})`);
}
return true;
} catch (error) {
throw new Error(`SQL validation error: ${error.message}`);
}
}
// Helper functions for extraction and validation...
Performance Optimization
Connection Pooling Configuration
Optimizing database connection management:
// database.js
const sql = require('mssql');
function createDatabasePool(config) {
// Apply performance optimizations
const optimizedConfig = {
...config,
options: {
...config.options,
enableArithAbort: true,
trustServerCertificate: process.env.NODE_ENV !== 'production',
pool: {
max: parseInt(process.env.DB_POOL_MAX || '10'),
min: parseInt(process.env.DB_POOL_MIN || '0'),
idleTimeoutMillis: parseInt(process.env.DB_POOL_IDLE_TIMEOUT || '30000'),
acquireTimeoutMillis: parseInt(process.env.DB_POOL_ACQUIRE_TIMEOUT || '15000'),
createTimeoutMillis: parseInt(process.env.DB_POOL_CREATE_TIMEOUT || '30000'),
destroyTimeoutMillis: parseInt(process.env.DB_POOL_DESTROY_TIMEOUT || '5000'),
reapIntervalMillis: parseInt(process.env.DB_POOL_REAP_INTERVAL || '1000'),
createRetryIntervalMillis: parseInt(process.env.DB_POOL_RETRY_INTERVAL || '200')
}
}
};
return new sql.ConnectionPool(optimizedConfig);
}
module.exports = { createDatabasePool };
Query Result Streaming
Implementing streaming for large result sets:
async function executeStreamingQuery({ sql, bindings, options }, res) {
const request = dbPool.request();
// Apply bindings
if (bindings) {
Object.entries(bindings).forEach(([param, value]) => {
const paramName = param.startsWith('@') ? param.substring(1) : param;
request.input(paramName, value);
});
}
// Start JSON response
res.writeHead(200, { 'Content-Type': 'application/json' });
res.write('{"message_type":"streaming_response","content":{"records":[');
let firstRow = true;
let rowCount = 0;
// Stream results
const stream = request.query(sql);
stream.on('recordset', columns => {
// We could send schema information here if needed
});
stream.on('row', row => {
if (!firstRow) {
res.write(',');
}
firstRow = false;
res.write(JSON.stringify(row));
rowCount++;
// Check if we've hit row limit
if (options?.maxRows && rowCount >= options.maxRows) {
stream.cancel();
}
});
stream.on('error', err => {
// Handle error during streaming
res.write(`],"error":"${err.message}","row_count":${rowCount}}}`);
res.end();
});
stream.on('done', result => {
// Finish JSON response
res.write(`],"row_count":${rowCount},"complete":true}}`);
res.end();
});
}
Conclusion
SQL Database MCP servers represent a significant advancement in database integration technology for AI systems. By implementing this standardized protocol layer, developers can create secure, efficient communication channels between LLMs and SQL databases that enable powerful data operations while maintaining strict security boundaries. The protocol abstracts the complexity of database operations while providing rich contextual information that enhances AI reasoning capabilities.
The future of SQL MCP technology points toward increased integration with AI systems, more sophisticated query understanding, and enhanced security features. By mastering the technical implementation details outlined in this article, developers can create robust, scalable database interfaces that significantly enhance the capabilities of LLMs across diverse application domains.