定制 ivanfuhr/ingestor 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

ivanfuhr/ingestor

Composer 安装命令:

composer require ivanfuhr/ingestor

包简介

README 文档

README

Ingestor

Ingestor

Total Downloads Latest Stable Version License

Ingestor is a PHP library for safe, auditable data imports with isolated staging, atomic release, and an extensible pipeline.

Data enters through a source driver, is transformed into mutations by a definition, is applied in an isolated stage by a persistence driver, and is only then promoted to production — safely and atomically.

Requires PHP 8.2+ and the PDO extension.

Installation

⚡️ Get started by requiring the package using Composer:

composer require ivanfuhr/ingestor

Quick Start

use Ivanfuhr\Ingestor\Ingestor;
use Ivanfuhr\Ingestor\Driver\Persistence\PostgresDriver;
use Ivanfuhr\Ingestor\Driver\Source\CsvDriver;

$ingestor = Ingestor::make(
    persistence: new PostgresDriver($pdo),
    source: new CsvDriver(),
);

$import = $ingestor
    ->for(CustomerImport::class)
    ->from('/path/to/customers.csv')
    ->import();

if ($import->hasFailures()) {
    foreach ($import->failures() as $failure) {
        // inspect validation or persistence failures
    }

    $import->rollback();

    return;
}

$import->release();

Table of Contents

🏗️ Architecture

Ingestor separates four responsibilities:

Source
    ↓
Source Driver
    ↓
Iterable<RowContext>
    ↓
Definition (prepare → validate → map)
    ↓
Dataset (mutations)
    ↓
Persistence Driver
    ↓
Stage (isolated)
    ↓
Release (atomic promotion)
Driver Responsibility Implementations
Source Turns a source into input rows CsvDriver
Persistence Creates staging, persists mutations, and releases PostgresDriver

Drivers are injected at construction time. The import pipeline never needs to know how data is read or written.

$ingestor = Ingestor::make(
    persistence: new PostgresDriver($pdo),
    source: new CsvDriver(),
);

Why: Keeps reading, transformation, and persistence independent — each piece can be swapped or tested in isolation.

📋 Definitions & Schema

A Definition describes an import. It declares structure via Schema and transforms each row into write intentions via Dataset.

use Ivanfuhr\Ingestor\Contract\Definition;
use Ivanfuhr\Ingestor\Contract\Context;
use Ivanfuhr\Ingestor\Dataset\Dataset;
use Ivanfuhr\Ingestor\Schema\Schema;
use Ivanfuhr\Ingestor\Stage\EmptyStage;
use Ivanfuhr\Ingestor\Stage\PrefilledStage;
use Ivanfuhr\Ingestor\Conflict\UpdateOnConflict;

final class CustomerImport implements Definition
{
    public function schema(): Schema
    {
        return Schema::make()
            ->dataset('customers')
                ->using(PrefilledStage::class)
                ->onConflict(UpdateOnConflict::by('document'))
            ->dataset('addresses')
                ->using(EmptyStage::class);
    }

    public function map(array $row, Context $context): Dataset
    {
        return Dataset::make()
            ->insert('customers', [
                'document' => $row['cpf'],
                'name' => $row['name'],
            ])
            ->insert('addresses', [
                'document' => $row['cpf'],
                'city' => $row['city'],
            ]);
    }
}

Stage Strategies

Strategy Behavior
EmptyStage Dataset starts empty
PrefilledStage Dataset starts with a copy of existing data (ideal for incremental updates)

Conflict Strategies

Declared in the Schema and translated by the persistence driver:

UpdateOnConflict::by('document');
IgnoreOnConflict::by('document');
ReplaceOnConflict::by('document');
FailOnConflict::by('document');

A Stage is an isolated ingestion environment. Nothing touches production until release() is called.

Import
└── Stage
    ├── customers (staging table)
    └── addresses (staging table)

Why: One row can produce zero, one, or many mutations across multiple datasets — without coupling business logic to SQL.

🗂️ Context

Shared storage available throughout an import. Use it to preload ID maps, caches, and reference data so map() stays pure and fast.

use Ivanfuhr\Ingestor\Contract\Preparable;

final class OrderImport implements Definition, Preparable
{
    public function prepare(Context $context): void
    {
        $context->put('customers', Customer::pluck('id', 'document')->all());
    }

    public function map(array $row, Context $context): Dataset
    {
        $customers = $context->get('customers');

        return Dataset::make()->insert('orders', [
            'customer_id' => $customers[$row['document']] ?? null,
            'total' => $row['total'],
        ]);
    }
}

Why: Avoids N+1 queries during import. I/O belongs in prepare(); map() should be a pure Row + Context → Dataset transformation.

✅ Validation

Row validation is optional and runs before mapping. Implement ValidatesRows on your definition:

use Ivanfuhr\Ingestor\Contract\ValidatesRows;
use Ivanfuhr\Ingestor\Validation\Failure;

final class CustomerImport implements Definition, ValidatesRows
{
    public function validate(array $row, Context $context): iterable
    {
        if (empty($row['document'])) {
            yield Failure::error('document')
                ->message('Document is required.');
        }

        if (empty($row['phone'])) {
            yield Failure::warning('phone')
                ->message('Phone number is empty.');
        }
    }
}
Severity Behavior
ERROR Row is skipped — not mapped or persisted
WARNING Recorded, but the row continues through the pipeline

Failures are available after import:

$import->failures();
$import->hasFailures();

Why: Invalid rows are caught early, before any database writes, with full reporting for audits and reprocessing.

🚨 Persistence Failures

Database errors (NOT NULL, FOREIGN KEY, UNIQUE, etc.) are exposed through the same Failure mechanism, with additional context:

  • line() — original source line number
  • dataset() — affected dataset
  • data() — row data
  • cause() — underlying exception

Failures do not trigger an automatic rollback. You decide between release() and rollback().

$import = $ingestor
    ->for(CustomerImport::class)
    ->from($file)
    ->import();

if ($import->hasFailures()) {
    foreach ($import->failures() as $failure) {
        dump([
            'line' => $failure->line(),
            'dataset' => $failure->dataset(),
            'message' => $failure->message(),
            'data' => $failure->data(),
        ]);
    }

    $import->rollback();
    return;
}

$import->release();

SQL Failure Modes

PostgresDriver supports configurable failure diagnosis:

use Ivanfuhr\Ingestor\Driver\Persistence\SqlFailureMode;

new PostgresDriver($pdo, chunkSize: 500, failureMode: SqlFailureMode::Diagnostic);
Mode Priority
Fast Throughput — records batch failure when a bulk INSERT fails
Diagnostic Traceability — subdivides the batch to isolate the failing row

Why: Every mutation inherits its source row context, so persistence errors remain traceable even at scale.

🔗 Hooks

High-level lifecycle hooks for auditing, metrics, notifications, and external integrations. They run a fixed number of times regardless of row volume.

beforeImport()
    ↓
prepare()
    ↓
validate() → map() → persist()
    ↓
afterImport()
    ↓
release()
    ↓
beforeRelease() → promote stage → afterRelease()
Interface When Typical use
BeforeImport Before import starts Timers, logging, audit trail
AfterImport After all rows processed, before release Metrics, reports, notifications
BeforeRelease Immediately before promotion Final checks, manual approval
AfterRelease After promotion Cache invalidation, external sync

BeforeRelease can block publication:

use Ivanfuhr\Ingestor\Exception\CannotRelease;

public function beforeRelease(ImportedImport $import): void
{
    if ($import->hasFailures()) {
        throw CannotRelease::because('Import contains unresolved failures.');
    }
}

Why: Integrate with the outside world without per-row callbacks that would destroy throughput.

📊 Metrics

Read-only metrics collected during import. Available whether you release or rollback.

$metrics = $import->metrics();

$metrics->startedAt();
$metrics->finishedAt();
$metrics->duration();

$metrics->rows();          // rows processed
$metrics->importedRows();  // rows imported successfully
$metrics->failedRows();    // rows with failures
$metrics->mutations();     // mutations produced

foreach ($metrics->datasets() as $dataset) {
    $dataset->name();
    $dataset->mutations();
    $dataset->persisted();
    $dataset->failures();
}

Failures answer what and why. Metrics answer how much and how long.

Why: Every import becomes observable — performance, throughput, and per-dataset breakdowns without affecting the pipeline.

🧪 Testing Utilities

Test definitions in isolation — no database, no CSV files, no external infrastructure.

Asserting the Schema

use Ivanfuhr\Ingestor\Ingestor;

Ingestor::test(CustomerImport::class)
    ->assertDataset('customers')
    ->assertStage(PrefilledStage::class)
    ->assertUpdateOnConflict('document');

Asserting map()

Ingestor::test(CustomerImport::class)
    ->withContext(['customers' => ['12345678901' => 1]])
    ->map(['cpf' => '12345678901', 'name' => 'Ada', 'city' => 'SP'])
    ->assertInserted('customers', [
        'document' => '12345678901',
        'name' => 'Ada',
    ])
    ->assertDatasetCount('addresses', 1);

Asserting Validation

Ingestor::test(CustomerImport::class)
    ->map(['document' => null])
    ->assertFailure(field: 'document', message: 'Document is required.')
    ->assertFailureCount(1);

Asserting the Full Pipeline

Ingestor::test(CustomerImport::class)
    ->fromRows([
        ['cpf' => '1', 'name' => 'Ada', 'city' => 'SP'],
        ['cpf' => '2', 'name' => 'Bob', 'city' => 'RJ'],
    ])
    ->import()
    ->assertRows(2)
    ->assertImportedRows(2)
    ->assertFailedRows(0)
    ->assertMutations(4);

Why: Definitions should be fully testable with fast, deterministic tests — safe to refactor without spinning up infrastructure.

🐘 PostgreSQL Driver

PostgresDriver creates staging tables, inserts data in configurable batches, and atomically promotes staging to production.

use Ivanfuhr\Ingestor\Driver\Persistence\PostgresDriver;
use Ivanfuhr\Ingestor\Driver\Persistence\SqlFailureMode;

$driver = new PostgresDriver(
    pdo: $pdo,
    chunkSize: 500,
    failureMode: SqlFailureMode::Fast,
);

The driver introspects production tables to build matching staging tables and applies conflict strategies from the Schema via ON CONFLICT.

Why: Staging + atomic swap gives you a safe rollback window before data ever reaches production.

📄 CSV Driver

CsvDriver reads CSV files with a header row and yields RowContext objects with line numbers and associative data.

use Ivanfuhr\Ingestor\Driver\Source\CsvDriver;

$ingestor = Ingestor::make($persistence, new CsvDriver());

Why: Line numbers flow through the entire pipeline, enabling precise failure reporting back to the source file.

🛠️ Development

composer test          # PHPUnit
composer lint          # PHP-CS-Fixer (check)
composer lint:fix      # PHP-CS-Fixer (fix)
composer phpstan       # Static analysis
composer rector        # Automated refactoring

Community

License

Ingestor was created by Ivan Führ under the MIT license.

统计信息

  • 总下载量: 0
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 4
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-06-15