tyluigiutils package¶
Submodules¶
tyluigiutils.external module¶
-
class
tyluigiutils.external.ExternalFileTask(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTaskUse this task to model a dependency on a file that is created outside of Luigi. This task has no run method, so if the expected file is not present, this task will fail, which is the only thing we can do in this case.
-
hdfs= <luigi.parameter.Parameter object>¶
-
output()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-
output_path= <luigi.parameter.Parameter object>¶
-
tyluigiutils.hadoop module¶
-
class
tyluigiutils.hadoop.JsonInputJobTask(*args, **kwargs)[source]¶ Bases:
luigi.contrib.hadoop.JobTaskHelper class for a Hadoop job that reads JSON input, optionally validated by a schema.
-
class
tyluigiutils.hadoop.JsonJobTask(*args, **kwargs)[source]¶ Bases:
tyluigiutils.hadoop.JsonInputJobTask,tyluigiutils.hadoop.JsonOutputJobTaskHadoop job which reads and writes JSON lines. Keys are plain text, but values are JSON-encoded. Fails if JSON decoding or encoding fails for any input or output line.
Optionally, input and/or output can be validated against a JSON schema. To achieve this, override the input_schema and/or output_schema methods.
-
class
tyluigiutils.hadoop.JsonOutputJobTask(*args, **kwargs)[source]¶ Bases:
luigi.contrib.hadoop.JobTaskHelper class for a Hadoop job that writes JSON input, optionally validated by a schema.
-
class
tyluigiutils.hadoop.TYDefaultHadoopJobRunner(**kwargs)[source]¶ Bases:
luigi.contrib.hadoop.HadoopJobRunner
-
class
tyluigiutils.hadoop.VenvJobTask(*args, **kwargs)[source]¶ Bases:
luigi.contrib.hadoop.JobTaskHadoop job which runs in a virtual environment.
It uses the configuration ‘venv_path’ in the ‘hadoop’ section of your luigi configuration file. If the venv_path is a zip file it will use it directly as archive. If it is a directory will compress it and ship it with the hadoop job. The venv can be also located directly on HDFS, use the hdfs:// url prefix to indicate it.
-
job_runner()[source]¶ Get the MapReduce runner for this job.
If all outputs are HdfsTargets, the DefaultHadoopJobRunner will be used. Otherwise, the LocalJobRunner which streams all data through the local machine will be used (great for testing).
-
libjars= None¶
-
venv_path= <luigi.parameter.Parameter object>¶
-
venv_tmp= None¶
-
tyluigiutils.spark module¶
-
class
tyluigiutils.spark.BaseTYPySparkTask(*args, **kwargs)[source]¶ Bases:
luigi.contrib.spark.PySparkTask-
atomic_output= <luigi.parameter.BoolParameter object>¶
-
main(sc, *args)[source]¶ Called by the pyspark_runner with a SparkContext and any arguments returned by
app_options()Parameters: - sc – SparkContext
- args – arguments list
-
write_csv_output(*args, **kwargs)[source]¶ Writes csv output from a dataframe.
Parameters: - df – A dataframe.
- sc – A SparkContext.
- sep – Separator.
- quote – sets the single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, “. If you would like to turn off quotations, you need to set an empty string.
- escape – sets the single character used for escaping quotes inside an already quoted value. If None is set, it uses the default value.
- escapeQuotes – A flag indicating whether values containing quotes should always be enclosed in quotes. If None is set, it uses the default value true, escaping all values containing a quote character.
Returns: None
-
-
class
tyluigiutils.spark.SparkVenvJobTask(*args, **kwargs)[source]¶ Bases:
luigi.contrib.spark.PySparkTask-
archives¶
-
conf¶
-
program_environment()[source]¶ Override this method to control environment variables for the program
Returns: dict mapping environment variable names to values
-
set_pyspark_python_in_env¶ If set to true, the property PYSPARK_PYTHON will be set as part of the task environment. This is set to {venv_name}/bin/python, where the venv name is taken from
venv_name()property. It also assume thevenvdirectory is available on the working directory. True by default.
-
set_pyspark_python_spark_conf¶ If set to true, the property spark.yarn.appMasterEnv.PYSPARK_PYTHON will be also set. This requires that the PYSPARK_PYTHON is also available at the cluster nodes. Normally this is taken care of by the taks but you may want to double check unzipped files path. It is true by default.
To ensure this work properly the path {venv_name}/bin/python should be available in both the machine running the driver (in clinet mode) and also on the workers and application master (when running it in cluster mode). Generally not necessary if working in client mode.
Example: my venv is created on the
myvenvdirectory on the current working directory (i.e. where the application is launched). Then it should be also packaged as myvenv.zip should contain:$ unzip -l -qq myvenv.zip ... 437 2017-08-31 15:13 bin/python ...
-
venv_name¶
-
venv_path¶ Returns the path (hdfs:// or local) of the zipped venv to use
Implement this returning path. It can be taken from a configuration file or a Luigi Task parameter.
Returns: a path (string) of the zipped HDFS file
-
-
tyluigiutils.spark.normalize_local_filename(filename)[source]¶ Normalizes a local file full path to be used by python os/file io.
Parameters: filename – File full path. Returns: Normalized filename.
-
tyluigiutils.spark.prepend_paths(*paths)[source]¶ Prepend paths before calling the function. This is useful when you are using a function that require a package and this function is run on one worker, the workers might not had the right PYTHON_PATH (although the packages might be found in the working directory).
-
tyluigiutils.spark.write_decorator(write_function)[source]¶ Decorator for Spark output writer.
This decorator writes first to a temporary files. if the writing is successful then it moves the temporary file to the final name. This is necessary as Spark might fail at the saveAs* stage however the final file would be created preventing Luigi to scheduling the task again after a failur.
Parameters: write_function – A writer function that saves the output. Returns: The wrapper function.
tyluigiutils.sqoop module¶
Gather data using Sqoop table dumps run on RDBMS databases. Adapted from: https://github.com/edx/edx-analytics-pipeline/blob/master/edx/analytics/tasks/sqoop.py
-
class
tyluigiutils.sqoop.SqoopImportFromPgSQL(*args, **kwargs)[source]¶ Bases:
tyluigiutils.sqoop.SqoopImportTaskAn abstract task that uses Sqoop to read data out of a database and writes it to a file in CSV or Avro format.
Known Issues: - Sqoop does not support very well the uuid format of Postgres so a cast to string needs to be done when dumping. Be aware of this when spliting (split_by) as this will be slow for large databases. For splitting is better to use a numeric index or use not splitting at all.
-
import_args()[source]¶ Returns list of arguments specific to Sqoop import from a Postgress database.
-
schema= <luigi.parameter.Parameter object>¶
-
-
class
tyluigiutils.sqoop.SqoopImportRunner[source]¶ Bases:
luigi.contrib.hadoop.JobRunnerRuns a SqoopImportTask by shelling out to sqoop.
-
class
tyluigiutils.sqoop.SqoopImportTask(*args, **kwargs)[source]¶ Bases:
luigi.contrib.hadoop.BaseHadoopJobTaskAn abstract task that uses Sqoop to read data out of a database and writes it to a file in several format. In order to protect the database access credentials they are loaded from an external file which can be secured appropriately. By default it can be read from configuration (bad practice) but the _get_credentials method can be overridden to use a more secure approach
Known Issues: - Parquet file output won’t work on free query. So use it only when dumping a whole table. This will be solved in Sqoop 1.4.7
-
as_avro= <luigi.parameter.BoolParameter object>¶
-
as_parquet= <luigi.parameter.BoolParameter object>¶
-
atomic_output= <luigi.parameter.BoolParameter object>¶
-
columns= <luigi.parameter.Parameter object>¶
-
columns_map= <luigi.parameter.Parameter object>¶
-
database= <luigi.parameter.Parameter object>¶
-
delimiter_replacement= <luigi.parameter.Parameter object>¶
-
destination= <luigi.parameter.Parameter object>¶
-
fields_terminated_by= <luigi.parameter.Parameter object>¶
-
generic_args(password_target)[source]¶ Returns list of arguments used by all Sqoop commands, using credentials read from file.
-
host= <luigi.parameter.Parameter object>¶
-
null_string= <luigi.parameter.Parameter object>¶
-
num_mappers= <luigi.parameter.Parameter object>¶
-
output()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-
password= <luigi.parameter.Parameter object>¶
-
port= <luigi.parameter.Parameter object>¶
-
query= <luigi.parameter.Parameter object>¶
-
split_by= <luigi.parameter.Parameter object>¶
-
table_name= <luigi.parameter.Parameter object>¶
-
username= <luigi.parameter.Parameter object>¶
-
verbose= <luigi.parameter.BoolParameter object>¶
-
where= <luigi.parameter.Parameter object>¶
-
tyluigiutils.url module¶
Adapted from https://github.com/edx/edx-analytics-pipeline/blob/master/edx/analytics/tasks/url.py Support URLs. Specifically, we want to be able to refer to data stored in a variety of locations and formats using a standard URL syntax. Examples:: s3://some-bucket/path/to/file /path/to/local/file.gz hdfs://some/directory/ file://some/local/directory
-
class
tyluigiutils.url.ExternalURL(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTaskSimple Task that returns a target based on its URL
-
output()[source]¶ The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
- If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
-
url= <luigi.parameter.Parameter object>¶
-
-
class
tyluigiutils.url.IgnoredTarget[source]¶ Bases:
luigi.contrib.hdfs.target.HdfsTargetDummy target for use in Hadoop jobs that produce no explicit output file.
-
exists()[source]¶ Returns
Trueif the path for this FileSystemTarget exists;Falseotherwise.This method is implemented by using
fs.
-
open(mode='r')[source]¶ Open the FileSystem target.
This method returns a file-like object which can either be read from or written to depending on the specified mode.
Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.
-
-
class
tyluigiutils.url.UncheckedExternalURL(*args, **kwargs)[source]¶ Bases:
tyluigiutils.url.ExternalURLA ExternalURL task that does not verify if the source file exists, which can be expensive for S3 URLs.
-
tyluigiutils.url.get_hdfs_target_from_rel_path(path, use_hdfs_dir=False)[source]¶ Transform paths into hdfs url using hdfs_dir
param: use_hdfs_dir : if True, it will use hdfs_dir configuration to build the path.
-
tyluigiutils.url.get_hdfs_target_with_date(filename)[source]¶ Returns the default Target formatted hdfs:///user/{username}/{hdfs_dir}/{date}/{filename}
-
tyluigiutils.url.get_hdfs_target_with_specific_date(filename, date)[source]¶ Returns the default Target formatted hdfs:///user/{username}/{hdfs_dir}/{date}/{filename}
-
tyluigiutils.url.get_target_class_from_url(url)[source]¶ Returns a luigi target class based on the url scheme
-
tyluigiutils.url.normalize_hdfs_url(hdfs_path)[source]¶ Normalizes an hdfs url to return an hdfs path
-
tyluigiutils.url.url_path_join(url, *extra_path)[source]¶ Extend the path component of the given URL. Relative paths extend the existing path, absolute paths replace it. Special path elements like ‘.’ and ‘..’ are not treated any differently than any other path element.
- Examples:
- url=http://foo.com/bar, extra_path=baz -> http://foo.com/bar/baz url=http://foo.com/bar, extra_path=/baz -> http://foo.com/baz url=http://foo.com/bar, extra_path=../baz -> http://foo.com/bar/../baz
- Args:
- url (str): The URL to modify. extra_path (str): The path to join with the current URL path.
- Returns:
- The URL with the path component joined with extra_path argument.
tyluigiutils.path module¶
Module contents¶
Top-level package for TrustYou Luigi Utilities.