Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Aclima/beam-postgres-debugging

Open more actions menu
 
 

Repository files navigation

beam-postgres

PyPI Supported Versions

Light IO transforms for Postgres read/write in Apache Beam pipelines.

Goal

The project aims to provide highly performant and customizable transforms and is not intended to support many different SQL database engines.

Features

  • ReadAllFromPostgres, ReadFromPostgres`` and WriteToPostgres` transforms
  • Records can be mapped to tuples, dictionaries or dataclasses
  • Reads and writes are in configurable batches

Usage

Printing data from the database table:

import apache_beam as beam
from psycopg.rows import dict_row

from beam_postgres.io import ReadAllFromPostgres

with beam.Pipeline() as p:
    data = p | "Reading example records from database" >> ReadAllFromPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "select id, data from source",
        dict_row,
    )
    data | "Writing to stdout" >> beam.Map(print)

Writing data to the database table:

from dataclasses import dataclass

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from beam_postgres.io import WriteToPostgres


@dataclass
class Example:
    data: str


with beam.Pipeline(options=PipelineOptions()) as p:
    data = p | "Reading example records" >> beam.Create(
        [
            Example("example1"),
            Example("example2"),
        ]
    )
    data | "Writing example records to database" >> WriteToPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "insert into sink (data) values (%(data)s)",
    )

See here for more examples.

Reading in batches

There may be situations when you have so much data that it will not fit into the memory - then you want to read your table data in batches. You can see an example code here (the code reads records in a batches of 1).

About

Light IO transforms for Postgres read/write in Apache Beam pipelines.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages

  • Python 100.0%
Morty Proxy This is a proxified and sanitized view of the page, visit original site.