SQL Mcp

SQL Database MCP: How to Use MCP Servers with SQL Databases

Introduction

Model Context Protocol (MCP) represents a transformative paradigm in AI-database interactions, establishing a standardized communication framework between large language models (LLMs) and relational database management systems (RDBMS). This architecture enables bidirectional, stateful communication that transcends traditional SQL client limitations through a well-defined protocol layer. By implementing MCP servers as middleware between LLMs and SQL databases, developers can expose database functionality through a semantically rich context layer rather than direct query execution, thereby maintaining robust security boundaries while significantly enhancing model capabilities for complex data operations.

The technical implementation of MCP with SQL databases addresses fundamental challenges in data access patterns, query optimization, and security that conventional API interfaces cannot efficiently resolve. This protocol layer abstracts the underlying database complexity while providing a standardized interface that supports streaming results, stateful transactions, and schema introspection through a unified communication channel.

Explore the comprehensive SQL MCP server implementation in the official repository: github.com/RichardHan/mssql_mcp_server (opens in a new tab)

MCP Architecture for SQL Databases

The SQL Database MCP architecture implements a specialized middleware layer that bridges the semantic gap between LLM reasoning and relational database operations. This architecture comprises several critical components:

Protocol Specification

The SQL MCP protocol defines a structured JSON-based message format that encapsulates database operations:

{
  "message_id": "req_6e7ba0fc-543b-4b8c-b03d-7c117d6c7e45",
  "timestamp": "2025-04-04T15:23:47.089Z",
  "source": "model",
  "destination": "sql_server",
  "message_type": "request",
  "content": {
    "operation": "query",
    "parameters": {
      "sql": "SELECT product_id, name, price FROM products WHERE category = @category AND price < @maxPrice",
      "bindings": {
        "@category": "electronics",
        "@maxPrice": 500
      },
      "options": {
        "timeout": 5000,
        "maxRows": 100
      }
    }
  },
  "metadata": {
    "session_id": "sess_abc123",
    "transaction_id": "tx_789xyz"
  }
}

Server Components

A comprehensive SQL MCP server implementation contains these core modules:

  1. Request Handler: Processes incoming requests, validates message format, and routes to appropriate database operation handlers
  2. Query Processor: Parses SQL statements, performs security validation, and manages execution context
  3. Parameter Binding: Securely binds parameters to prevent SQL injection vulnerabilities
  4. Result Formatter: Transforms database result sets into protocol-compliant response messages
  5. Transaction Manager: Maintains transaction state across multiple operations
  6. Schema Inspector: Provides metadata about database objects for context enrichment
  7. Security Layer: Implements access control and query analysis to prevent unauthorized operations

Implementing an SQL MCP Server

Environment Requirements

  • Node.js ≥ 16.x
  • Database driver for target SQL database (e.g., mssql, mysql2, pg)
  • Express.js for HTTP server functionality
  • Winston or similar for structured logging
  • Redis (optional) for distributed session management

Server Implementation

The core server implementation requires several key components:

// sql-mcp-server.js
const express = require('express');
const cors = require('cors');
const winston = require('winston');
const { v4: uuidv4 } = require('uuid');
const { createDatabasePool } = require('./database');
const { validateRequest } = require('./validators');
const { executeOperation } = require('./operations');
 
// Configure logging
const logger = winston.createLogger({
  level: process.env.LOG_LEVEL || 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.json()
  ),
  transports: [
    new winston.transports.Console(),
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' })
  ]
});
 
// Initialize database connection pool
const dbPool = createDatabasePool({
  server: process.env.DB_SERVER,
  database: process.env.DB_NAME,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  options: {
    encrypt: true,
    trustServerCertificate: process.env.NODE_ENV !== 'production',
    connectionTimeout: 30000,
    requestTimeout: 30000,
    pool: {
      max: 10,
      min: 0,
      idleTimeoutMillis: 30000
    }
  }
});
 
const app = express();
app.use(cors());
app.use(express.json({ limit: '10mb' }));
 
// MCP endpoint
app.post('/mcp', async (req, res) => {
  const requestStart = Date.now();
  
  try {
    // Validate request structure
    validateRequest(req.body);
    
    const { message_id, content, metadata } = req.body;
    const { operation, parameters } = content;
    
    logger.info(`Processing SQL operation: ${operation}`, { 
      requestId: message_id,
      sessionId: metadata?.session_id,
      transactionId: metadata?.transaction_id
    });
    
    // Execute database operation
    const result = await executeOperation({
      operation,
      parameters,
      metadata,
      dbPool,
      logger
    });
    
    // Construct MCP response
    const response = {
      message_id: `resp_${uuidv4()}`,
      in_reply_to: message_id,
      timestamp: new Date().toISOString(),
      source: 'sql_server',
      destination: 'model',
      message_type: 'response',
      content: result,
      metadata: {
        ...metadata,
        processing_time_ms: Date.now() - requestStart
      }
    };
    
    res.json(response);
    
  } catch (error) {
    logger.error(`Error processing request: ${error.message}`, {
      stack: error.stack
    });
    
    // Construct error response
    res.status(error.status || 500).json({
      message_id: `err_${uuidv4()}`,
      in_reply_to: req.body?.message_id || 'unknown',
      timestamp: new Date().toISOString(),
      source: 'sql_server',
      destination: 'model',
      message_type: 'error',
      content: {
        error: error.message,
        code: error.code || 'INTERNAL_ERROR',
        sql_state: error.sqlState,
        sql_error_number: error.number
      },
      metadata: {
        processing_time_ms: Date.now() - requestStart
      }
    });
  }
});
 
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  logger.info(`SQL MCP Server running on port ${PORT}`);
});

Database Operations Implementation

The core functionality for handling different SQL operations:

// operations.js
const { beginTransaction, commitTransaction, rollbackTransaction } = require('./transactions');
 
async function executeOperation({ operation, parameters, metadata, dbPool, logger }) {
  switch (operation) {
    case 'query':
      return await executeQuery(parameters, metadata, dbPool, logger);
    case 'execute':
      return await executeProcedure(parameters, metadata, dbPool, logger);
    case 'begin_transaction':
      return await beginTransaction(metadata, dbPool, logger);
    case 'commit_transaction':
      return await commitTransaction(metadata, dbPool, logger);
    case 'rollback_transaction':
      return await rollbackTransaction(metadata, dbPool, logger);
    case 'get_schema':
      return await getSchemaInformation(parameters, dbPool, logger);
    default:
      throw new Error(`Unsupported operation: ${operation}`);
  }
}
 
async function executeQuery({ sql, bindings, options }, metadata, dbPool, logger) {
  // Get transaction if one exists
  const transactionId = metadata?.transaction_id;
  const transaction = transactionId ? getTransaction(transactionId) : null;
  
  // Create request object with appropriate transaction context
  const request = transaction ? transaction.request() : dbPool.request();
  
  // Apply timeout if specified
  if (options?.timeout) {
    request.timeout = options.timeout;
  }
  
  // Apply parameter bindings
  if (bindings) {
    Object.entries(bindings).forEach(([param, value]) => {
      // Strip @ prefix if present
      const paramName = param.startsWith('@') ? param.substring(1) : param;
      request.input(paramName, value);
    });
  }
  
  // Execute query with row count limit if specified
  let result;
  if (options?.maxRows) {
    sql = `SET ROWCOUNT ${options.maxRows}; ${sql}; SET ROWCOUNT 0;`;
  }
  
  result = await request.query(sql);
  
  // Format the result for MCP response
  return {
    recordsets: result.recordsets,
    rowsAffected: result.rowsAffected,
    output: result.output || {},
    returnValue: result.returnValue
  };
}
 
// Additional operation implementations...
 
module.exports = { executeOperation };

Advanced Features

Schema Introspection

Implementing schema introspection allows LLMs to understand database structure:

async function getSchemaInformation({ objects, filter }, dbPool, logger) {
  const objectTypes = objects || ['tables', 'columns', 'views', 'procedures'];
  const results = {};
  
  // Get tables
  if (objectTypes.includes('tables')) {
    const tableQuery = `
      SELECT 
        t.TABLE_SCHEMA as schema_name,
        t.TABLE_NAME as table_name,
        t.TABLE_TYPE as table_type
      FROM 
        INFORMATION_SCHEMA.TABLES t
      WHERE 
        t.TABLE_TYPE = 'BASE TABLE'
        ${filter ? `AND t.TABLE_NAME LIKE '%${filter}%'` : ''}
      ORDER BY 
        t.TABLE_SCHEMA, t.TABLE_NAME
    `;
    
    const tablesResult = await dbPool.request().query(tableQuery);
    results.tables = tablesResult.recordset;
  }
  
  // Get columns for tables
  if (objectTypes.includes('columns')) {
    const columnsQuery = `
      SELECT 
        c.TABLE_SCHEMA as schema_name,
        c.TABLE_NAME as table_name,
        c.COLUMN_NAME as column_name,
        c.DATA_TYPE as data_type,
        c.CHARACTER_MAXIMUM_LENGTH as max_length,
        c.IS_NULLABLE as is_nullable,
        COLUMNPROPERTY(OBJECT_ID(c.TABLE_SCHEMA + '.' + c.TABLE_NAME), c.COLUMN_NAME, 'IsIdentity') as is_identity,
        CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 1 ELSE 0 END as is_primary_key
      FROM 
        INFORMATION_SCHEMA.COLUMNS c
      LEFT JOIN (
        SELECT 
          k.TABLE_SCHEMA,
          k.TABLE_NAME,
          k.COLUMN_NAME
        FROM 
          INFORMATION_SCHEMA.KEY_COLUMN_USAGE k
        INNER JOIN 
          INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc ON k.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
        WHERE 
          tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
      ) pk ON c.TABLE_SCHEMA = pk.TABLE_SCHEMA AND c.TABLE_NAME = pk.TABLE_NAME AND c.COLUMN_NAME = pk.COLUMN_NAME
      ${filter ? `WHERE c.TABLE_NAME LIKE '%${filter}%'` : ''}
      ORDER BY 
        c.TABLE_SCHEMA, c.TABLE_NAME, c.ORDINAL_POSITION
    `;
    
    const columnsResult = await dbPool.request().query(columnsQuery);
    results.columns = columnsResult.recordset;
  }
  
  // Additional schema queries for views, procedures, etc.
  
  return results;
}

Transaction Management

Implementing stateful transaction management:

// transactions.js
const { v4: uuidv4 } = require('uuid');
 
// In-memory transaction store (use Redis for distributed systems)
const transactions = new Map();
 
async function beginTransaction(metadata, dbPool, logger) {
  const transaction = dbPool.transaction();
  const transactionId = `tx_${uuidv4()}`;
  
  await transaction.begin();
  
  // Store transaction with metadata
  transactions.set(transactionId, {
    transaction,
    created: new Date().toISOString(),
    session_id: metadata.session_id,
    timeout: setTimeout(() => {
      // Auto-rollback if transaction is not committed/rolled back
      if (transactions.has(transactionId)) {
        transaction.rollback()
          .catch(err => logger.error(`Error auto-rolling back transaction: ${err.message}`));
        transactions.delete(transactionId);
        logger.warn(`Transaction ${transactionId} auto-rolled back due to timeout`);
      }
    }, 60000) // 1 minute timeout
  });
  
  return {
    transaction_id: transactionId,
    status: 'active',
    created: new Date().toISOString()
  };
}
 
async function commitTransaction(metadata, dbPool, logger) {
  const { transaction_id } = metadata;
  
  if (!transaction_id || !transactions.has(transaction_id)) {
    throw new Error(`Invalid or expired transaction: ${transaction_id}`);
  }
  
  const txInfo = transactions.get(transaction_id);
  
  try {
    await txInfo.transaction.commit();
    
    // Clean up
    clearTimeout(txInfo.timeout);
    transactions.delete(transaction_id);
    
    return {
      transaction_id,
      status: 'committed',
      committed_at: new Date().toISOString()
    };
  } catch (error) {
    logger.error(`Error committing transaction: ${error.message}`);
    throw error;
  }
}
 
// Additional transaction operations...
 
module.exports = {
  beginTransaction,
  commitTransaction,
  rollbackTransaction,
  getTransaction: (id) => transactions.get(id)?.transaction
};

Security Implementation

Query Whitelisting

Implementing query whitelisting for enhanced security:

// security.js
const { parse } = require('node-sql-parser');
 
// Configuration for query whitelisting
const securityConfig = {
  // Allowed table access patterns
  allowedTables: {
    'public': ['users', 'products', 'orders'],
    'analytics': ['*'] // Allow all tables in analytics schema
  },
  // Disallowed SQL features
  disallowedFeatures: ['insert', 'update', 'delete', 'create', 'drop', 'alter'],
  // Maximum query complexity (joins, subqueries, etc.)
  maxComplexity: 5
};
 
function validateQuery(sql) {
  try {
    // Parse SQL to AST
    const ast = parse(sql);
    
    // Check for disallowed statement types
    if (Array.isArray(ast)) {
      // Multiple statements detected
      throw new Error('Multiple SQL statements are not allowed');
    }
    
    const statementType = ast.type.toLowerCase();
    if (securityConfig.disallowedFeatures.includes(statementType)) {
      throw new Error(`SQL operation not permitted: ${statementType}`);
    }
    
    // Validate table access
    const tables = extractTablesFromAST(ast);
    for (const table of tables) {
      const { schema, name } = parseTableIdentifier(table);
      
      // Check if schema is allowed
      if (!securityConfig.allowedTables[schema]) {
        throw new Error(`Access to schema '${schema}' is not permitted`);
      }
      
      // Check if table is allowed
      if (securityConfig.allowedTables[schema] !== ['*'] && 
          !securityConfig.allowedTables[schema].includes(name)) {
        throw new Error(`Access to table '${schema}.${name}' is not permitted`);
      }
    }
    
    // Validate query complexity
    const complexity = calculateQueryComplexity(ast);
    if (complexity > securityConfig.maxComplexity) {
      throw new Error(`Query complexity score (${complexity}) exceeds maximum allowed (${securityConfig.maxComplexity})`);
    }
    
    return true;
  } catch (error) {
    throw new Error(`SQL validation error: ${error.message}`);
  }
}
 
// Helper functions for extraction and validation...

Performance Optimization

Connection Pooling Configuration

Optimizing database connection management:

// database.js
const sql = require('mssql');
 
function createDatabasePool(config) {
  // Apply performance optimizations
  const optimizedConfig = {
    ...config,
    options: {
      ...config.options,
      enableArithAbort: true,
      trustServerCertificate: process.env.NODE_ENV !== 'production',
      pool: {
        max: parseInt(process.env.DB_POOL_MAX || '10'),
        min: parseInt(process.env.DB_POOL_MIN || '0'),
        idleTimeoutMillis: parseInt(process.env.DB_POOL_IDLE_TIMEOUT || '30000'),
        acquireTimeoutMillis: parseInt(process.env.DB_POOL_ACQUIRE_TIMEOUT || '15000'),
        createTimeoutMillis: parseInt(process.env.DB_POOL_CREATE_TIMEOUT || '30000'),
        destroyTimeoutMillis: parseInt(process.env.DB_POOL_DESTROY_TIMEOUT || '5000'),
        reapIntervalMillis: parseInt(process.env.DB_POOL_REAP_INTERVAL || '1000'),
        createRetryIntervalMillis: parseInt(process.env.DB_POOL_RETRY_INTERVAL || '200')
      }
    }
  };
 
  return new sql.ConnectionPool(optimizedConfig);
}
 
module.exports = { createDatabasePool };

Query Result Streaming

Implementing streaming for large result sets:

async function executeStreamingQuery({ sql, bindings, options }, res) {
  const request = dbPool.request();
  
  // Apply bindings
  if (bindings) {
    Object.entries(bindings).forEach(([param, value]) => {
      const paramName = param.startsWith('@') ? param.substring(1) : param;
      request.input(paramName, value);
    });
  }
  
  // Start JSON response
  res.writeHead(200, { 'Content-Type': 'application/json' });
  res.write('{"message_type":"streaming_response","content":{"records":[');
  
  let firstRow = true;
  let rowCount = 0;
  
  // Stream results
  const stream = request.query(sql);
  
  stream.on('recordset', columns => {
    // We could send schema information here if needed
  });
  
  stream.on('row', row => {
    if (!firstRow) {
      res.write(',');
    }
    firstRow = false;
    
    res.write(JSON.stringify(row));
    rowCount++;
    
    // Check if we've hit row limit
    if (options?.maxRows && rowCount >= options.maxRows) {
      stream.cancel();
    }
  });
  
  stream.on('error', err => {
    // Handle error during streaming
    res.write(`],"error":"${err.message}","row_count":${rowCount}}}`);
    res.end();
  });
  
  stream.on('done', result => {
    // Finish JSON response
    res.write(`],"row_count":${rowCount},"complete":true}}`);
    res.end();
  });
}

Conclusion

SQL Database MCP servers represent a significant advancement in database integration technology for AI systems. By implementing this standardized protocol layer, developers can create secure, efficient communication channels between LLMs and SQL databases that enable powerful data operations while maintaining strict security boundaries. The protocol abstracts the complexity of database operations while providing rich contextual information that enhances AI reasoning capabilities.

The future of SQL MCP technology points toward increased integration with AI systems, more sophisticated query understanding, and enhanced security features. By mastering the technical implementation details outlined in this article, developers can create robust, scalable database interfaces that significantly enhance the capabilities of LLMs across diverse application domains.