Examples

Real-world examples of using Genro-Toolbox.

Genro Integration

Genro-Toolbox was designed to support Genro Kyō. Here’s how it’s used:

from genro_toolbox import extract_kwargs

class Service:
    @extract_kwargs(logging=True, async_mode=True, plugins=True)
    def __init__(
        self,
        name=None,
        logging_kwargs=None,
        async_mode_kwargs=None,
        plugins_kwargs=None,
        **kwargs
    ):
        self.name = name or "service"

        # Setup logging plugin
        if logging_kwargs:
            self.plug(LoggingPlugin, **logging_kwargs)

        # Setup async mode
        if async_mode_kwargs:
            self.plug(AsyncPlugin, **async_mode_kwargs)

        # Custom plugins
        if plugins_kwargs:
            for plugin_name, plugin_config in plugins_kwargs.items():
                self.plug(plugin_name, **plugin_config)

# Usage
svc = Service(
    name="my_service",
    logging_level="DEBUG",
    logging_mode="silent",
    async_mode_enabled=True,
    async_mode_pool_size=10
)

Web Framework Configuration

from genro_toolbox import extract_kwargs

class WebApplication:
    @extract_kwargs(server=True, database=True, cache=True, auth=True)
    def __init__(
        self,
        name: str,
        server_kwargs=None,
        database_kwargs=None,
        cache_kwargs=None,
        auth_kwargs=None
    ):
        self.name = name

        # Server configuration
        self.server = self._setup_server(server_kwargs or {})

        # Database configuration
        self.db = self._setup_database(database_kwargs or {})

        # Cache configuration
        self.cache = self._setup_cache(cache_kwargs or {})

        # Auth configuration
        self.auth = self._setup_auth(auth_kwargs or {})

    def _setup_server(self, config):
        from werkzeug.serving import run_simple
        return {
            'host': config.get('host', '0.0.0.0'),
            'port': config.get('port', 8000),
            'threaded': config.get('threaded', True)
        }

    def _setup_database(self, config):
        from sqlalchemy import create_engine
        url = config.get('url', 'sqlite:///app.db')
        return create_engine(url, **config.get('options', {}))

    def _setup_cache(self, config):
        backend = config.get('backend', 'memory')
        if backend == 'redis':
            import redis
            return redis.Redis(**config.get('redis_options', {}))
        return {}

    def _setup_auth(self, config):
        return {
            'secret_key': config.get('secret_key', 'change-me'),
            'algorithm': config.get('algorithm', 'HS256'),
            'expiration': config.get('expiration', 3600)
        }

# Create application
app = WebApplication(
    name="myapp",
    server_host="0.0.0.0",
    server_port=8080,
    server_threaded=True,
    database_url="postgresql://localhost/mydb",
    cache_backend="redis",
    cache_redis_host="localhost",
    cache_redis_port=6379,
    auth_secret_key="my-secret-key",
    auth_expiration=7200
)

CLI Tool Configuration

import click
from genro_toolbox import extract_kwargs

class CLITool:
    @extract_kwargs(output=True, logging=True, performance=True)
    def __init__(
        self,
        output_kwargs=None,
        logging_kwargs=None,
        performance_kwargs=None
    ):
        self.output_config = output_kwargs or {}
        self.logging_config = logging_kwargs or {}
        self.perf_config = performance_kwargs or {}

    def run(self, data):
        # Apply configurations
        pass

@click.command()
@click.option('--output-format', default='json')
@click.option('--output-pretty/--output-compact', default=True)
@click.option('--logging-level', default='INFO')
@click.option('--logging-file', default=None)
@click.option('--performance-measure/--no-performance-measure', default=False)
def main(**kwargs):
    tool = CLITool(**kwargs)
    tool.run(data)

Data Pipeline

from genro_toolbox import extract_kwargs

class DataPipeline:
    @extract_kwargs(source=True, transform=True, sink=True, monitoring=True)
    def __init__(
        self,
        name: str,
        source_kwargs=None,
        transform_kwargs=None,
        sink_kwargs=None,
        monitoring_kwargs=None
    ):
        self.name = name

        # Setup source
        self.source = self._create_source(source_kwargs or {})

        # Setup transformations
        self.transforms = self._create_transforms(transform_kwargs or {})

        # Setup sink
        self.sink = self._create_sink(sink_kwargs or {})

        # Setup monitoring
        if monitoring_kwargs:
            self.monitor = self._create_monitor(monitoring_kwargs)

    def _create_source(self, config):
        source_type = config.get('type', 'file')
        if source_type == 'file':
            return FileSource(config.get('path'))
        elif source_type == 'database':
            return DatabaseSource(config.get('connection_string'))
        elif source_type == 'api':
            return APISource(config.get('endpoint'))

    def _create_transforms(self, config):
        transforms = []
        if config.get('normalize'):
            transforms.append(NormalizeTransform())
        if config.get('filter'):
            transforms.append(FilterTransform(config['filter']))
        if config.get('aggregate'):
            transforms.append(AggregateTransform(config['aggregate']))
        return transforms

    def _create_sink(self, config):
        sink_type = config.get('type', 'stdout')
        if sink_type == 'file':
            return FileSink(config.get('path'))
        elif sink_type == 'database':
            return DatabaseSink(config.get('connection_string'))
        return StdoutSink()

    def _create_monitor(self, config):
        return Monitor(
            metrics_enabled=config.get('metrics', True),
            tracing_enabled=config.get('tracing', False),
            logging_enabled=config.get('logging', True)
        )

# Usage
pipeline = DataPipeline(
    name="etl_pipeline",
    source_type="database",
    source_connection_string="postgresql://localhost/source_db",
    transform_normalize=True,
    transform_filter={"status": "active"},
    transform_aggregate={"by": "category"},
    sink_type="file",
    sink_path="/output/results.json",
    monitoring_metrics=True,
    monitoring_tracing=True
)

See Also