luigi
Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in.
I have am triggering Luigi via
luigi.run(["--local-scheduler"], main_task_cls=Test(Server = ActiveServer, Database = DB))
and in my class I have:
class Test(luigi.Task):
Database = luigi.Parameter()
Server = luigi.Parameter()
but the task test can't seem to parse the parameters that I'm feeding it properly?
I am getting:
MissingParameterException: No value for 'Server' (--Server) submitted and no default value has been assigned.
Source: (StackOverflow)
The following program does not output anything, nor does it throw any errors. Am I missing something in form of the run()
method in the to_S3()
class?
class to_S3(luigi.Task):
#The class Mysql_to_tsv converts the data returned by a query on a Mysqldb and stores the data in a tsv in a local file.
def requires(self):
return [Mysql_to_tsv]
def output(self):
return luigi.S3Target("https://s3.amazonaws.com/bucket-name/luigi_attempt.tsv")
The output()
method of the Mysql_to_tsv()
class is:
def output(self):
return luigi.LocalTarget('/Users/user/Desktop/Work/Luigi/test_data.tsv')
Please help with the correct class implementation of the task.
Source: (StackOverflow)
I am trying to mock something that supplies a default value for a luigi parameter.
A dumb example showing what I'm trying to accomplish:
Task under test:
import luigi
from bar import Bar
bar = Bar()
class Baz(luigi.Task):
qux = luigi.Parameter(default=bar.bar())
def baz(self):
return self.qux;
def foo(self):
return bar.bar()
Unit Test code:
import unittest
from mock import Mock, patch
from sut.baz import Baz
class TestMocking(unittest.TestCase):
def test_baz_bar(self):
self.assertEquals("bar", Baz().baz())
@patch('sut.baz.bar')
def test_patched_baz(self, mock_bar):
mock_bar.bar = Mock(return_value="foo")
self.assertEquals("foo", (Baz().baz()))
@patch('sut.baz.bar')
def test_patched_foo(self, mock_bar):
mock_bar.bar = Mock(return_value="foo")
self.assertEquals("foo", (Baz().foo()))
It appears that the luigi.Parameter logic happens earlier than the patch.
In this example, test_patched_foo
passes and test_patched_baz
fails. So the patch does happen, but happens after the call from the luigi.Parameter(default=bar.bar())
line.
Is it possible to mock and patch something called in this manner?
Source: (StackOverflow)
I just tried to run the python luigi example from the documentation:
class TaskA(luigi.Task):
def output(self):
return luigi.LocalTarget('xyz')
class FlipLinesBackwards(luigi.Task):
def requires(self):
return TaskA()
def output(self):
return luigi.LocalTarget('abc')
def run(self):
f = self.input().open('r') # this will return a file stream that reads from "xyz"
g = self.output().open('w')
for line in f:
g.write('%s\n', ''.join(reversed(line.strip().split())))
g.close() # needed because files are atomic
I ran it using command line:
python Luigi_Test.py FlipLinesBackwards --local-scheduler
I was under the impression that this would create a file in the directory I am running it in but it doesn't?
Am I doing something wrong?
Source: (StackOverflow)
I am using luigi as hadoop job pipeline. I read the example and docs, but can't find how to debug my script before I push the jobs to the hadoop servers?
More specific, I need to process a big set of data, and the run time is long, I prefer to test the jobs in small test data set rather than run it in the real data set.
Source: (StackOverflow)
Does Luigi have support for me to execute the entire flow?
The flow would be something like this:
Dumping MySQL data to S3, then moving the data to Redshift using the Copy command.
Can I execute the above workflow using Luigi?
Source: (StackOverflow)