EzDevInfo.com

luigi

Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.

Using Parameters in python luigi

I have am triggering Luigi via

luigi.run(["--local-scheduler"], main_task_cls=Test(Server = ActiveServer, Database = DB))   

and in my class I have:

class Test(luigi.Task):

    Database = luigi.Parameter()
    Server = luigi.Parameter()

but the task test can't seem to parse the parameters that I'm feeding it properly?

I am getting:

MissingParameterException: No value for 'Server' (--Server) submitted and no default value has been assigned.

Source: (StackOverflow)

Moving a tsv file from local file system to S3 in luigi

The following program does not output anything, nor does it throw any errors. Am I missing something in form of the run() method in the to_S3() class?

class to_S3(luigi.Task):

    #The class Mysql_to_tsv converts the data returned by a query on a Mysqldb and stores the data in a tsv in a local file.

    def requires(self):
        return [Mysql_to_tsv]

    def output(self):
        return luigi.S3Target("https://s3.amazonaws.com/bucket-name/luigi_attempt.tsv")

The output() method of the Mysql_to_tsv() class is:

def output(self):
        return luigi.LocalTarget('/Users/user/Desktop/Work/Luigi/test_data.tsv')

Please help with the correct class implementation of the task.


Source: (StackOverflow)

Advertisements

Luigi parameter default values and mocks

I am trying to mock something that supplies a default value for a luigi parameter.

A dumb example showing what I'm trying to accomplish:

Task under test:

import luigi
from bar import Bar

bar = Bar()

class Baz(luigi.Task):

    qux = luigi.Parameter(default=bar.bar())

    def baz(self):
        return self.qux;

    def foo(self):
        return bar.bar()

Unit Test code:

import unittest
from mock import Mock, patch
from sut.baz import Baz

class TestMocking(unittest.TestCase):

    def test_baz_bar(self):
        self.assertEquals("bar", Baz().baz())

    @patch('sut.baz.bar')
    def test_patched_baz(self, mock_bar):
        mock_bar.bar = Mock(return_value="foo")
        self.assertEquals("foo", (Baz().baz()))

    @patch('sut.baz.bar')
    def test_patched_foo(self, mock_bar):
        mock_bar.bar = Mock(return_value="foo")
        self.assertEquals("foo", (Baz().foo()))

It appears that the luigi.Parameter logic happens earlier than the patch.

In this example, test_patched_foo passes and test_patched_baz fails. So the patch does happen, but happens after the call from the luigi.Parameter(default=bar.bar()) line.

Is it possible to mock and patch something called in this manner?


Source: (StackOverflow)

writing output to files from python luigi

I just tried to run the python luigi example from the documentation:

class TaskA(luigi.Task):
    def output(self):
        return luigi.LocalTarget('xyz')

class FlipLinesBackwards(luigi.Task):
    def requires(self):
        return TaskA()

    def output(self):
        return luigi.LocalTarget('abc')

    def run(self):
        f = self.input().open('r') # this will return a file stream that reads from "xyz"
        g = self.output().open('w')
        for line in f:
           g.write('%s\n', ''.join(reversed(line.strip().split())))
        g.close() # needed because files are atomic

I ran it using command line:

python Luigi_Test.py FlipLinesBackwards --local-scheduler

I was under the impression that this would create a file in the directory I am running it in but it doesn't?

Am I doing something wrong?


Source: (StackOverflow)

Whe use luigi to build hadoop job pipeline, how to debug my script?

I am using luigi as hadoop job pipeline. I read the example and docs, but can't find how to debug my script before I push the jobs to the hadoop servers?

More specific, I need to process a big set of data, and the run time is long, I prefer to test the jobs in small test data set rather than run it in the real data set.


Source: (StackOverflow)

How do I write a Luigi workflow for loading data into Redshift from MySQL?

Does Luigi have support for me to execute the entire flow?

The flow would be something like this: Dumping MySQL data to S3, then moving the data to Redshift using the Copy command.

Can I execute the above workflow using Luigi?


Source: (StackOverflow)